Socket配置参数的含义

Socket SoLinger

在Java Socket中,当我们调用Socket的close方法时,默认的行为是当底层网卡所有数据都发送完毕后,关闭连接
通过setSoLinger方法,我们可以修改close方法的行为

  1. setSoLinger(true, 0)
    1. 当网卡收到关闭连接请求后,无论数据是否发送完毕,立即发送RST包关闭连接
  2. setSoLinger(true, delay_time)

当网卡收到关闭连接请求后,等待delay_time
如果在delay_time过程中数据发送完毕,正常四次挥手关闭连接
如果在delay_time过程中数据没有发送完毕,发送RST包关闭连接

Socket 的写超时

  1. connect(SocketAddress endpoint, int timeout) 连接超时
  2. setSoTimeout(int timeout) 写时超时
  3. socket的写超时

socket的写超时是基于TCP的超时重传。超时重传是TCP保证数据可靠性传输的一个重要机制,其原理是在发送一个数据报文后就开启一个计时器,在一定时间内如果没有得到发送报文的确认ACK,那么就重新发送报文。如果重新发送多次之后,仍没有确认报文,就发送一个复位报文RST,然后关闭TCP连接。首次数据报文发送与复位报文传输之间的时间差大约为9分钟,也就是说如果9分钟内没有得到确认报文,就关闭连接。但是这个值是根据不同的TCP协议栈实现而不同。

如果发送端调用write持续地写出数据,直到SendQ队列被填满。如果在SendQ队列已满时调用write方法,则write将被阻塞,直到SendQ有新的空闲空间为止,也就是说直到一些字节传输到了接收者套接字的RecvQ中。如果此时RecvQ队列也已经被填满,所有操作都将停止,直到接收端调用read方法将一些字节传输到应用程序。
当Socket的write发送数据时,如果网线断开、对端进程崩溃或者对端机器重启动,TCP模块会重传数据,最后超时而关闭连接。下次如再调用write会导致一个异常而退出。

Socket写超时是基于TCP协议栈的超时重传机制,一般不需要设置write的超时时间,也没有提供这种方法。

Socket SoTimeout

  1. /**
  2. * Enable/disable {@link SocketOptions#SO_TIMEOUT SO_TIMEOUT}
  3. * with the specified timeout, in milliseconds. With this option set
  4. * to a non-zero timeout, a read() call on the InputStream associated with
  5. * this Socket will block for only this amount of time. If the timeout
  6. * expires, a <B>java.net.SocketTimeoutException</B> is raised, though the
  7. * Socket is still valid. The option <B>must</B> be enabled
  8. * prior to entering the blocking operation to have effect. The
  9. * timeout must be {@code > 0}.
  10. * A timeout of zero is interpreted as an infinite timeout.
  11. *
  12. * @param timeout the specified timeout, in milliseconds.
  13. * @exception SocketException if there is an error
  14. * in the underlying protocol, such as a TCP error.
  15. * @since JDK 1.1
  16. * @see #getSoTimeout()
  17. */
  18. public synchronized int getSoTimeout() throws SocketException {
  19. if (isClosed())
  20. throw new SocketException("Socket is closed");
  21. Object o = getImpl().getOption(SocketOptions.SO_TIMEOUT);
  22. /* extra type safety */
  23. if (o instanceof Integer) {
  24. return ((Integer) o).intValue();
  25. } else {
  26. return 0;
  27. }
  28. }
  29. public synchronized void setSoTimeout(int timeout) throws SocketException {
  30. if (isClosed())
  31. throw new SocketException("Socket is closed");
  32. if (timeout < 0)
  33. throw new IllegalArgumentException("timeout can't be negative");
  34. getImpl().setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout));
  35. }
  1. public void setSoTimeout(int timeout) throws SocketException
  2. 使用指定的超时时间启用/禁用SO_TIMEOUT(以毫秒为单位)。 使用此选项设置为非零超时时,
  3. 与此Socket相关联的InputStream上的read()调用将仅阻止此时间。
  4. 如果超时超时,则引发java.net.SocketTimeoutException ,尽管Socket仍然有效。
  5. 必须先启用该选项才能进入阻止操作才能生效。 超时时间必须为> 0 超时为零被解释为无限超时。
  6. 参数: timeout - 指定的超时时间,以毫秒为单位。
  7. 异常: SocketException - 如果底层协议有错误,如TCP错误。
  1. /** Set a timeout on blocking Socket operations:
  2. * <PRE>
  3. * ServerSocket.accept();
  4. * SocketInputStream.read();
  5. * DatagramSocket.receive();
  6. * </PRE>
  7. *
  8. * <P> The option must be set prior to entering a blocking
  9. * operation to take effect. If the timeout expires and the
  10. * operation would continue to block,
  11. * <B>java.io.InterruptedIOException</B> is raised. The Socket is
  12. * not closed in this case.
  13. *
  14. * <P> Valid for all sockets: SocketImpl, DatagramSocketImpl
  15. *
  16. * @see Socket#setSoTimeout
  17. * @see ServerSocket#setSoTimeout
  18. * @see DatagramSocket#setSoTimeout
  19. */
  20. @Native public final static int SO_TIMEOUT = 0x1006;
  1. @Native
  2. static final int SO_TIMEOUT
  3. 在阻塞套接字操作时设置超时:
  4. ServerSocket.accept();
  5. SocketInputStream.read();
  6. DatagramSocket.receive();
  7. 必须先设置该选项才能进入阻止操作才能生效。 如果超时过期,并且操作将继续阻止,
  8. 则引发java.io.InterruptedIOException 在这种情况下,Socket不关闭。
  9. 适用于所有套接字:SocketImplDatagramSocketImpl
  10. 另请参见:
  11. Socket.setSoTimeout(int) ServerSocket.setSoTimeout(int)
  12. DatagramSocket.setSoTimeout(int) Constant Field Values

如果输入缓冲队列RecvQ中没有数据,read操作会一直阻塞而挂起线程,直到有新的数据到来或者有异常产生。调用setSoTimeout(int timeout)可以设置超时时间,如果到了超时时间仍没有数据,read会抛出一个SocketTimeoutException,程序需要捕获这个异常,但是当前的socket连接仍然是有效的。

如果对方进程崩溃、对方机器突然重启、网络断开,本端的read会一直阻塞下去,这时设置超时时间是非常重要的,否则调用read的线程会一直挂起。

TCP模块把接收到的数据放入RecvQ中,直到应用层调用输入流的read方法来读取。如果RecvQ队列被填满了,这时TCP会根据滑动窗口机制通知对方不要继续发送数据,本端停止接收从对端发送来的数据,直到接收者应用程序调用输入流的read方法后腾出了空间。


代码说明SoTimeout

  1. import java.io.IOException;
  2. import java.io.InputStream;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. public class ServerMain {
  6. public static void main(String[] args) throws IOException {
  7. ServerSocket serverSocket = new ServerSocket(8888);
  8. long t1 = 0;
  9. try {
  10. Socket socket = serverSocket.accept();
  11. System.out.println("服务端接收到一个连接");
  12. t1 = System.currentTimeMillis();
  13. //设置该通道的read()方法超时
  14. socket.setSoTimeout(5000);
  15. InputStream inputStream = accept.getInputStream();
  16. //read阻塞
  17. inputStream.read();
  18. } finally {
  19. System.out.println("服务端setSoTimeout 耗时:"
  20. + (System.currentTimeMillis() - t1));
  21. }
  22. }
  23. }
  1. import java.io.InputStream;
  2. import java.net.InetSocketAddress;
  3. import java.net.Socket;
  4. public class ClientMain {
  5. public static void main(String[] args) throws Exception {
  6. Socket socket = new Socket();
  7. socket.connect(new InetSocketAddress(8888));
  8. //设置超时时间
  9. socket.setSoTimeout(10000);
  10. InputStream inputStream = socket.getInputStream();
  11. long t1 = System.currentTimeMillis();
  12. try {
  13. inputStream.read();
  14. } finally {
  15. System.out.println("客户端setSoTimeout 耗时:"
  16. + (System.currentTimeMillis() - t1));
  17. }
  18. }
  19. }

启动以后
服务端日志

  1. 服务端接收到一个连接
  2. 服务端setSoTimeout 耗时:5007
  3. Exception in thread "main" java.net.SocketTimeoutException: Read timed out
  4. at java.net.SocketInputStream.socketRead0(Native Method)
  5. at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
  6. at java.net.SocketInputStream.read(SocketInputStream.java:171)
  7. at java.net.SocketInputStream.read(SocketInputStream.java:141)
  8. at java.net.SocketInputStream.read(SocketInputStream.java:224)
  9. at cn.java.money.bio.demo9.ServerMain.main(ServerMain.java:21)

客户端日志

  1. 客户端setSoTimeout 耗时:5496
  2. Exception in thread "main" java.net.SocketException: Connection reset
  3. at java.net.SocketInputStream.read(SocketInputStream.java:210)
  4. at java.net.SocketInputStream.read(SocketInputStream.java:141)
  5. at java.net.SocketInputStream.read(SocketInputStream.java:224)
  6. at cn.java.money.bio.demo9.ClientMain.main(ClientMain.java:17)

说明:
soTimeout默认值是0,也就是没有超时时间,会无限的等待。
服务端抛出异常 java.net.SocketTimeoutException: Read timed out 是因为我们设置了soTimeout socket.setSoTimeout(5000); 但是客户端一致没有写数据,服务端读数据等待5000毫秒,超时就抛出异常SocketTimeoutException,抛出异常以后,该连接就会被关闭,向客户端发送了RST包。因此客户端收到的是java.net.SocketException: Connection reset (RST包)https://www.yuque.com/protocal/tcp/nq74g5

Socket ReceiveBufferSize SendBufferSize

  1. public synchronized int getReceiveBufferSize() throws SocketException{
  2. if (isClosed())
  3. throw new SocketException("Socket is closed");
  4. int result = 0;
  5. Object o = getImpl().getOption(SocketOptions.SO_RCVBUF);
  6. if (o instanceof Integer) {
  7. result = ((Integer)o).intValue();
  8. }
  9. return result;
  10. }
  11. public synchronized void setReceiveBufferSize(int size)
  12. throws SocketException{
  13. if (size <= 0) {
  14. throw new IllegalArgumentException("invalid receive size");
  15. }
  16. if (isClosed())
  17. throw new SocketException("Socket is closed");
  18. getImpl().setOption(SocketOptions.SO_RCVBUF, new Integer(size));
  19. }
  20. public synchronized int getSendBufferSize() throws SocketException {
  21. if (isClosed())
  22. throw new SocketException("Socket is closed");
  23. int result = 0;
  24. Object o = getImpl().getOption(SocketOptions.SO_SNDBUF);
  25. if (o instanceof Integer) {
  26. result = ((Integer)o).intValue();
  27. }
  28. return result;
  29. }
  30. public synchronized void setSendBufferSize(int size)
  31. throws SocketException{
  32. if (!(size > 0)) {
  33. throw new IllegalArgumentException("negative send size");
  34. }
  35. if (isClosed())
  36. throw new SocketException("Socket is closed");
  37. getImpl().setOption(SocketOptions.SO_SNDBUF, new Integer(size));
  38. }
  39. //java.net.SocketOptions
  40. @Native public final static int SO_RCVBUF = 0x1002;
  41. @Native public final static int SO_SNDBUF = 0x1001;

Socket打印版 - 图1

ServerSocket backlog

  1. public ServerSocket(int port, int backlog) throws IOException {
  2. this(port, backlog, null);
  3. }

backlog 最终作用于Linux的参数 tcp_max_syn_backlog

  1. [root@JD1 ~]# cat /proc/sys/net/ipv4/tcp_max_syn_backlog
  2. 256
  3. [root@JD1 ~]# sysctl -a|grep tcp_max_syn_backlog
  4. net.ipv4.tcp_max_syn_backlog = 256

tcp_max_syn_backlog 影响的是 半连接队列和全连接队列的大小
Socket打印版 - 图2
Socket打印版 - 图3
image.png
(somaxconn 推测是 socket max connection 缩写)

  1. [root@JD1 ~]# sysctl -a|grep net.core.somaxconn
  2. net.core.somaxconn = 128
  3. [root@JD1 ~]# cat /proc/sys/net/core/somaxconn
  4. 128

Socket打印版 - 图5
Socket打印版 - 图6

代码测试backlog

  1. import java.io.IOException;
  2. import java.io.InputStream;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. public class ServerMain {
  6. public static void main(String[] args) throws IOException {
  7. //设置backlog为2, 最多只能有3个客户端(2个在全连接队列中+1个accept的)
  8. ServerSocket serverSocket = new ServerSocket(8888, 2);
  9. Socket accept = serverSocket.accept();
  10. InputStream inputStream = accept.getInputStream();
  11. inputStream.read();
  12. }
  13. }
  1. public class ClientMain {
  2. public static void main(String[] args) throws Exception {
  3. Socket socket = new Socket();
  4. socket.connect(new InetSocketAddress("127.0.0.1", 8888));
  5. InputStream inputStream = socket.getInputStream();
  6. inputStream.read();
  7. }
  8. }

Socket打印版 - 图7

Socket connectTimeOut

1. Socket connectTimeOut 在JDK中
  1. public Socket(InetAddress address, int port) throws IOException {
  2. this(address != null ? new InetSocketAddress(address, port) : null,
  3. (SocketAddress) null, true);
  4. }
  5. private Socket(SocketAddress address, SocketAddress localAddr,
  6. boolean stream) throws IOException {
  7. setImpl();
  8. // backward compatibility
  9. if (address == null)
  10. throw new NullPointerException();
  11. try {
  12. createImpl(stream);
  13. if (localAddr != null)
  14. bind(localAddr);
  15. connect(address);
  16. } catch (IOException | IllegalArgumentException | SecurityException e) {
  17. try {
  18. close();
  19. } catch (IOException ce) {
  20. e.addSuppressed(ce);
  21. }
  22. throw e;
  23. }
  24. }
  25. public void connect(SocketAddress endpoint) throws IOException {
  26. //默认超时时间为0
  27. connect(endpoint, 0);
  28. }
  29. /**
  30. * Connects this socket to the server with a specified timeout value.
  31. * A timeout of zero is interpreted as an infinite timeout. The connection
  32. * will then block until established or an error occurs.
  33. *
  34. * @param endpoint the {@code SocketAddress}
  35. * @param timeout the timeout value to be used in milliseconds.
  36. * @throws IOException if an error occurs during the connection
  37. * @throws SocketTimeoutException if timeout expires before connecting
  38. * @throws java.nio.channels.IllegalBlockingModeException
  39. * if this socket has an associated channel,
  40. * and the channel is in non-blocking mode
  41. * @throws IllegalArgumentException if endpoint is null or is a
  42. * SocketAddress subclass not supported by this socket
  43. * @since 1.4
  44. * @spec JSR-51
  45. */
  46. public void connect(SocketAddress endpoint, int timeout) throws IOException {
  47. //省略
  48. }
  49. //java.net.DualStackPlainSocketImpl#connect0
  50. static native int connect0(int fd, InetAddress remote, int remotePort) throws IOException;

Socket打印版 - 图8
默认超时时间为0,意味着没有设置超时时间,连接是不是过期的。

2. Socket 连接建立超时表现在TCP协议

socket连接建立是基于TCP的连接建立过程。TCP的连接需要通过3次握手报文来完成,开始建立TCP连接时需要发送同步SYN报文,然后等待确认报文SYN+ACK,最后再发送确认报文ACK。TCP连接的关闭通过4次挥手来完成,主动关闭TCP连接的一方发送FIN报文,等待对方的确认报文;被动关闭的一方也发送FIN报文,然等待确认报文。
Socket打印版 - 图9
正在等待TCP连接请求的一端有一个固定长度的连接队列,该队列中的连接已经被TCP接受(即三次握手已经完成),但还没有被应用层所接受。TCP接受一个连接是将其放入这个连接队列,而应用层接受连接是将其从该队列中移出。应用层可以通过设置backlog变量来指明该连接队列的最大长度,即已被TCP接受而等待应用层接受的最大连接数。

当一个连接请求SYN到达时,TCP确定是否接受这个连接。如果队列中还有空间,TCP模块将对SYN进行确认并完成连接的建立。但应用层只有在三次握手中的第三个报文收到后才会知道这个新连接。如果队列没有空间,TCP将不理会收到的SYN。

如果应用层不能及时接受已被TCP接受的连接,这些连接可能占满整个连接队列,新的连接请求可能不被响应而会超时。如果一个连接请求SYN发送后,一段时间后没有收到确认SYN+ACK,TCP会重传这个连接请求SYN两次,每次重传的时间间隔加倍,在规定的时间内仍没有收到SYN+ACK,TCP将放弃这个连接请求,连接建立就超时了。

JAVA Socket连接建立超时和TCP是相同的,如果TCP建立连接时三次握手超时,那么导致Socket连接建立也就超时了。可以设置Socket连接建立的超时时间 connect(SocketAddress endpoint, int timeout)

如果在timeout内,连接没有建立成功,在TimeoutException异常被抛出。如果timeout的值小于三次握手的时间,那么Socket连接永远也不会建立。

不同的应用层有不同的连接建立过程,Socket的连接建立和TCP一样-仅仅需要三次握手就完成连接,但有些应用程序需要交互很多信息后才能成功建立连接,比如Telnet协议,在TCP三次握手完成后,需要进行选项协商之后,Telnet连接才建立完成。

3. 代码测试
  1. import java.net.InetSocketAddress;
  2. import java.net.Socket;
  3. public class ClientMain {
  4. public static void main(String[] args) throws Exception {
  5. Socket socket = new Socket();
  6. long t1 = System.currentTimeMillis();
  7. // 设置 connection timeout (单位毫秒)
  8. try {
  9. //ip地址是随意写的
  10. //socket.connect(new InetSocketAddress("192.168.2.145",8888));
  11. //设置超时时间
  12. socket.connect(new InetSocketAddress("192.168.2.145",8888), 5000);
  13. } finally {
  14. System.out.println("客户端connection Timeout 耗时:"
  15. + (System.currentTimeMillis() - t1) + "毫秒");
  16. }
  17. }
  18. }

设置了 timout

客户端connection Timeout 耗时:5011毫秒 Exception in thread “main” java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) at java.net.Socket.connect(Socket.java:606) at ClientMain.main(ClientMain.java:11)

未设置timout, 为什么是20秒左右,不知道原因。将来慢慢研究一下。

客户端connection Timeout 耗时:21010毫秒 Exception in thread “main” java.net.ConnectException: 拒绝连接 (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) at java.net.Socket.connect(Socket.java:606) at java.net.Socket.connect(Socket.java:555) at ClientMain.main(ClientMain.java:10)

netsh int tcp show global
image.png

Socket keepalive

  • Socket keepalive的配置本质是TCP的保活机制。
  • Socket是客户端和服务端建立的虚拟通信通道。客户端的Socket和服务端的Socket逻辑上是同一个。
  • Java Socket编程中有个keepalive选项不是用来表示长链接的。
  • socket 连接建立之后,只要双方均未主动关闭连接,那这个连接就是会一直保持的,就是持久的连接
    keepalive 只是为了防止连接的双方发生意外而通知不到对方,导致一方还持有连接,占用资源。

    java.net.Socket#setKeepAlive ```java public void setKeepAlive(boolean on) throws SocketException { if (isClosed())

    1. throw new SocketException("Socket is closed");

    getImpl().setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on)); }

public boolean getKeepAlive() throws SocketException { if (isClosed()) throw new SocketException(“Socket is closed”); return ((Boolean) getImpl().getOption(SocketOptions.SO_KEEPALIVE)).booleanValue(); }

  1. ```java
  2. public interface SocketOptions {
  3. /**
  4. * When the keepalive option is set for a TCP socket and no data
  5. * has been exchanged across the socket in either direction for
  6. * 2 hours (NOTE: the actual value is implementation dependent),
  7. * TCP automatically sends a keepalive probe to the peer. This probe is a
  8. * TCP segment to which the peer must respond.
  9. * One of three responses is expected:
  10. * 1. The peer responds with the expected ACK. The application is not
  11. * notified (since everything is OK). TCP will send another probe
  12. * following another 2 hours of inactivity.
  13. * 2. The peer responds with an RST, which tells the local TCP that
  14. * the peer host has crashed and rebooted. The socket is closed.
  15. * 3. There is no response from the peer. The socket is closed.
  16. *
  17. * The purpose of this option is to detect if the peer host crashes.
  18. *
  19. * Valid only for TCP socket: SocketImpl
  20. *
  21. * @see Socket#setKeepAlive
  22. * @see Socket#getKeepAlive
  23. */
  24. @Native public final static int SO_KEEPALIVE = 0x0008; //0x0008表示的是操作id
  25. }

源码注释的意思是,如果这个连接上双方任意方向在2小时之内没有发送过数据,那么tcp会自动发送一个探测探测包给对方,这种探测包对方是必须回应的,回应结果有三种:

  1. 正常ACK,继续保持连接;
  2. 对方响应RST信号,双方重新连接。
  3. 对方无响应。

这里说的两小时,其实是依赖于系统配置,在linux系统中(windows在注册表中,可以自行查询资料),tcp的keepalive参数。

  1. [root@JD1 ~]# sysctl -a|grep tcp_keepalive
  2. net.ipv4.tcp_keepalive_intvl = 75
  3. net.ipv4.tcp_keepalive_probes = 9
  4. net.ipv4.tcp_keepalive_time = 7200
  1. net.ipv4.tcp_keepalive_intvl = 75
    1. 发送探测包的周期,前提是当前连接一直没有数据交互,才会以该频率进行发送探测包,如果中途有数据交互,则会重新计时tcp_keepalive_time,到达规定时间没有数据交互,才会重新以该频率发送探测包
  2. net.ipv4.tcp_keepalive_probes = 9
    1. 探测失败的重试次数,发送探测包达次数限制对方依旧没有回应,则关闭自己这端的连接
  3. net.ipv4.tcp_keepalive_time = 7200
    1. 空闲多长时间,则发送探测包

可以通过 sysctl -w net.ipv4.tcp_keepalive_time=60进行修改,执行sysctl -p刷新配置生效;
可以通过修改/etc/sysctl.conf永久生效


当建立TCP链接后,如果应用程序或者上层协议一直不发送数据,或者隔很长一段时间才发送数据,当链接很久没有数据报文传输时就需要通过keepalive机制去确定对方是否在线,链接是否需要继续保持。当超过一定时间没有发送数据时,TCP会自动发送一个数据为空的报文给对方,如果对方回应了报文,说明对方在线,链接可以继续保持,如果对方没有报文返回,则在重试一定次数之后认为链接丢失,就不会释放链接。

控制对闲置连接的检测机制,链接闲置达到7200秒,就开始发送探测报文进行探测。

net.ipv4.tcp_keepalive_time:单位秒,表示发送探测报文之前的链接空闲时间,默认为7200。
net.ipv4.tcp_keepalive_intvl:单位秒,表示两次探测报文发送的时间间隔,默认为75。
net.ipv4.tcp_keepalive_probes:表示探测的次数,默认9次。


为了能验证所说的,我们来进行测试一下,本人测试环境是客户端在本地windows上,服务端是在远程linux上,主要测试服务器端向客户端发送探测包(客户端向服务端发送是一样的原理,这里测试服务器端到客户端的原因是我们修改了服务端的keep-alive便于观察)。

  1. 首先需要装一个抓包工具,本人用的wireshark;
  2. 然后修改一下tcp_keepalive_time系统配置,改成1分钟,2小时太长了,其余配置不变。

修改方法:执行sysctl -w net.ipv4.tcp_keepalive_time=60进行修改,执行sysctl -p刷新配置生效;

  1. 最后写一个服务器端和一个客户端,分别启动。

服务器端代码如下(java8):

  1. import java.io.IOException;
  2. import java.io.InputStream;
  3. import java.io.OutputStream;
  4. import java.net.ServerSocket;
  5. import java.net.Socket;
  6. public class Server {
  7. public static void main(String[] args) throws IOException {
  8. ServerSocket ss = new ServerSocket(12345);
  9. while (true) {
  10. //建立虚拟连接通道
  11. Socket socket = ss.accept();
  12. new Thread(() -> {
  13. try {
  14. //开启keppAlive机制
  15. socket.setKeepAlive(true);
  16. socket.setReceiveBufferSize(8 * 1024);
  17. socket.setSendBufferSize(8 * 1024);
  18. InputStream is = socket.getInputStream();
  19. OutputStream os = socket.getOutputStream();
  20. try {
  21. byte[] bytes = new byte[1024];
  22. while (is.read(bytes) > -1) {
  23. System.out.println(System.currentTimeMillis()
  24. + " received message: "
  25. + new String(bytes, "UTF-8").trim());
  26. os.write("ok".getBytes("UTF-8"));
  27. os.flush();
  28. bytes = new byte[1024];
  29. }
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. } finally {
  33. if (!socket.isInputShutdown()) {
  34. socket.shutdownInput();
  35. }
  36. if (!socket.isOutputShutdown()) {
  37. socket.shutdownOutput();
  38. }
  39. if (!socket.isClosed()) {
  40. socket.close();
  41. }
  42. }
  43. } catch (IOException e) {
  44. e.printStackTrace();
  45. }
  46. }).start();
  47. }
  48. }
  49. }

客户端代码如下:

  1. public class Client {
  2. public static void main(String[] args) throws IOException, InterruptedException {
  3. Socket socket = new Socket("192.168.16.84", 12345);
  4. //开启tcp的keepAlive机制
  5. socket.setKeepAlive(true);
  6. socket.setSendBufferSize(8192);
  7. socket.setReceiveBufferSize(8192);
  8. InputStream is = socket.getInputStream();
  9. OutputStream os = socket.getOutputStream();
  10. os.write("get test-key".getBytes("UTF-8"));
  11. os.flush();
  12. Thread.sleep(155 * 1000L);
  13. os.write("get test-key".getBytes("UTF-8"));
  14. os.flush();
  15. byte[] bytes = new byte[1024];
  16. while (is.read(bytes) > -1) {
  17. System.out.println(System.currentTimeMillis()
  18. + " received message: "
  19. + new String(bytes, "UTF-8").trim());
  20. bytes = new byte[1024];
  21. }
  22. if (!socket.isOutputShutdown()) {
  23. socket.shutdownOutput();
  24. }
  25. if (!socket.isInputShutdown()) {
  26. socket.shutdownInput();
  27. }
  28. if (!socket.isClosed()) {
  29. socket.close();
  30. }
  31. }
  32. }

分别启动服务端和客户端之后,抓包工具抓到的数据:
image.png
可以看到,60秒时服务器发送了探测包,探测客户端是否正常,客户端正常响应了,之后以tcp_keepalive_intvl(75秒)的周期进行发送,可以看到135秒又进行发送了探测包。

但是因为我们客户端的代码是在155秒重新发送了数据,所以需要继续空闲60秒,直到215秒才继续发送探测包,后续没有数据交互,所以还是以75秒间隔频率进行发送探测包。从抓包的数据上很容易看出来。

keepalive默认是关闭的,下面我们把服务器端的socket.setKeepAlive(true)一行注释掉的抓包结果:
Socket打印版 - 图12
可以看到服务器端没有向客户端发送探测包,其实客户端设置了socket.setKeepAlive(true),客户端在7355(7200+155)秒时应该会向服务器发送探测包(我把程序挂了2小时。。。结果如下)
image.png
验证无误。


windows下的tcp keepalive
缺省情况下,如果空闲连接在7200000毫秒(2小时)内没有活动,系统就会发送保持连接的消息。 具体操作:浏览至HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\TCPIP\Parameters 注册表子键,在Parameters子键下创建或修改名为KeepAliveTime的REG_DWORD值,为该值设置适当的毫秒数。


socket 连接建立之后,只要双方均未主动关闭连接,那这个连接就是会一直保持的,就是持久的连接
keepalive 只是为了防止连接的双方发生意外而通知不到对方,导致一方还持有连接,占用资源。

其实这个选项的意思是TCP连接空闲时是否需要向对方发送探测包,实际上是依赖于底层的TCP模块实现的,java中只能设置是否开启,不能设置其详细参数,只能依赖于系统配置。

keepalive 不是说TCP的长连接,当我们作为服务端,一个客户端连接上来,如果设置了keeplive为 true,当对方没有发送任何数据过来,超过一个时间(看系统内核参数配置),那么我们这边会发送一个ack探测包发到对方,探测双方的TCP/IP连接是否有效(对方可能断电,断网)。如果不设置,那么客户端宕机时,服务器永远也不知道客户端宕机了,仍然保存这个失效的连接。

当然,在客户端也可以使用这个参数。客户端Socket会每隔段的时间(大约两个小时)就会利用空闲的连接向服务器发送一个数据包。这个数据包并没有其它的作用,只是为了检测一下服务器是否仍处于活动状态。如果服务器未响应这个数据包,在大约11分钟后,客户端Socket再发送一个数据包,如果在12分钟内,服务器还没响应,那么客户端Socket将关闭。如果将Socket选项关闭,客户端Socket在服务器无效的情况下可能会长时间不会关闭。

Windows Socket 最大连接数

TCP/IP 协议规定的,只用了2个字节表示端口号。容易让人误解为1个server只允许连接65535个Client。
typedef struct _NETWORK_ADDRESS_IP
{
USHORT sin_port;//0~65535
ULONG in_addr;
UCHAR sin_zero[8];
} NETWORK_ADDRESS_IP, *PNETWORK_ADDRESS_IP;
(1)其实65535这个数字,只是决定了服务器端最多可以拥有65535个Bind的Socket。也就是说,最多可以开65535个服务器进程,但是你要知道这个能够连接客户端的数量没有任何关系,Accept过来的Socket是不需要Bind任何IP地址的,也没有端口占用这一说。作为Server端的Socket本身只负责监听和接受连接操作。
(2)TCP协议里面是用[源IP+源Port+目的IP+目的 Port]来区别两个不同连接,所以连入和连出是两个不同的概念。连出Connect就不错了,需要生成随机端口,这个是有限的连入的话, 因SOCKET的分配受内存分页限制,而连接受限制(WINDOWS)。
(3)所以,千万不要误以为1个server只允许连接65535个Client。记住,TCP连出受端口限制,连入仅受内存限制。
例如server,IP:192.168.16.254,Port:8009
Client1:IP:192.168.16.1,Port:2378
Client2:IP:192.168.16.2,Port:2378
Client1和Client2虽然Port相同,但是IP不同,所以是不同的连接。
(4)想让1个server并发高效得连接几万个Client,需要使用IOCP“完成端口(Completion Port)”的技术。
详情请参考文章:http://blog.csdn.net/libaineu2004/article/details/40087167

connect timeout和so timeout的应用

1 . MySQL jdbc timeout

查阅MySQL Connector/J 5.1 Developer Guide 中的jdbc配置参数,有

connectTimeout Timeout for socket connect (in milliseconds), with 0 being no timeout. Only works on JDK-1.4 or newer. Defaults to ‘0’. Default: 0 Since version: 3.0.1
socketTimeout Timeout on network socket operations (0, the default means no timeout). Default: 0 Since version: 3.0.1

这两个参数分别就是对应上面我们分析的connect timeout和so timeout。
参数的设置方法有两种,一种是通过url设置,

  1. jdbc:mysql://[host1][:port1][,[host2][:port2]]...[/[database]]
  2. [?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]

即在url后面通过?加参数,比如jdbc:mysql://192.168.1.1:3306/test?connectTimeout=2000&socketTime=2000
还有一种方式是:

  1. Properties info = new Properties();
  2. info.put("user", this.username);
  3. info.put("password", this.password);
  4. info.put("connectTimeout", "2000");
  5. info.put("socketTime", "2000");
  6. return DriverManager.getConnection(this.url, info);

2. Jedis timeout

Jedis是最流行的redis java客户端工具,redis.clients.jedis.Jedis对象的构造器中就有参数设置,

  1. public Jedis(final String host, final int port, final int connectionTimeout,
  2. final int soTimeout) {
  3. super(host, port, connectionTimeout, soTimeout);
  4. }
  1. // 用一个参数timeout同时设置connect timeout 和 so timeout
  2. public Jedis(final String host, final int port, final int timeout) {
  3. super(host, port, timeout);
  4. }

Jedis中so timeout个人觉得是有比较重要意义的,首先jedis so timeout默认值为2000毫秒,jedis的操作流程是客户端发送命令给客户端执行,然后客户端就开始执行InputStream.read()读取响应,当某个命令比较耗时(比如数据非常多的情况下执行“keys *”),而导致客户端迟迟没有收到响应,就可能导致java.net.SocketTimeoutException: Read timed out异常抛出。一般是不建议客户端执行非常耗时的命令,但是也不排除有这种特殊逻辑,那这时候就有可能需要修改Jeids中这个so timeout的值。

NIO源码

1. NIO 模型代码

image.png
NIO 的 selector解决了很多连接不需要遍历每一个Channel的问题。
selector.select() 阻塞等待需要处理的事件发生。

  1. package cn.java.money.nio.demo2;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. public class Server {
  11. public static void main(String[] args) throws IOException {
  12. //首先的有一个通道,接受连接
  13. ServerSocketChannel ssChannel = ServerSocketChannel.open();
  14. ssChannel.configureBlocking(false);
  15. ssChannel.bind(new InetSocketAddress(8888));
  16. Selector selector = Selector.open();
  17. ssChannel.register(selector, SelectionKey.OP_ACCEPT);
  18. //select会在这里阻塞
  19. while (selector.select() > 0) {
  20. //通过Selector,只遍历有事件的channel去处理
  21. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  22. while (iterator.hasNext()) {
  23. SelectionKey selectionKey = iterator.next();
  24. if (selectionKey.isAcceptable()) {
  25. SocketChannel socketChannel = ssChannel.accept();
  26. socketChannel.configureBlocking(false);
  27. socketChannel.register(selector, SelectionKey.OP_READ);
  28. } else if (selectionKey.isReadable()) {
  29. SocketChannel channel = (SocketChannel) selectionKey.channel();
  30. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  31. int length = 0;
  32. //channel.read(byteBuffer)是把数据读入byteBuffer,此时byteBuffer的模式是写入模式
  33. // 如果某个通道的数据量很多,这里就会长时间处理,就会影响其他channel
  34. while ((length = channel.read(byteBuffer)) > 0) {
  35. byteBuffer.flip();
  36. //相当于读取数据,从buffer中读取数据
  37. System.out.println(new String(byteBuffer.array(), 0, length));
  38. byteBuffer.clear();
  39. }
  40. }
  41. //事件处理完要移除
  42. iterator.remove();
  43. }
  44. }
  45. }
  46. }
  1. package cn.java.money.nio.demo2;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SocketChannel;
  6. import java.util.Scanner;
  7. public class Client {
  8. public static void main(String[] args) throws IOException {
  9. SocketChannel socketChannel = SocketChannel.open(
  10. new InetSocketAddress("127.0.0.1", 8888));
  11. socketChannel.configureBlocking(false);
  12. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  13. Scanner scanner = new Scanner(System.in);
  14. while (true){
  15. String s = scanner.nextLine();
  16. //buffer写模式
  17. byteBuffer.put(s.getBytes());
  18. //切换为读模式
  19. byteBuffer.flip();
  20. //写到channel,就是从buffer中读取
  21. socketChannel.write(byteBuffer);
  22. byteBuffer.clear();
  23. }
  24. }
  25. }

2. NIO 源码 Linux版本代码

https://gitee.com/framework-src/openjdk-1.8-b132.git

2.1 Selector.open()
  1. Selector selector = Selector.open();
  2. public static Selector open() throws IOException {
  3. return SelectorProvider.provider().openSelector();
  4. }
  5. public static SelectorProvider provider() {
  6. return sun.nio.ch.DefaultSelectorProvider.create();
  7. }
  8. 不同操作系统的JDK提供了不同的 DefaultSelectorProvider
  9. 不同的DefaultSelectorProvider返回不同的SelectorProvider的实现
  10. --- Windown 版本JDK
  11. WindowsSelectorProvider
  12. WindowsSelectorImpl
  13. --- Linux 版本JDK
  14. EPollSelectorProvider
  15. EPollSelectorImpl

不同操作系统的JDK提供了不同的 DefaultSelectorProvider
image.png
solaris的 DefaultSelectorProvider

  1. public static SelectorProvider create() {
  2. String osname = AccessController
  3. .doPrivileged(new GetPropertyAction("os.name"));
  4. if (osname.equals("SunOS"))
  5. return createProvider("sun.nio.ch.DevPollSelectorProvider");
  6. if (osname.equals("Linux"))
  7. return createProvider("sun.nio.ch.EPollSelectorProvider");
  8. return new sun.nio.ch.PollSelectorProvider();
  9. }

sun.nio.ch.EPollSelectorProvider

  1. public class EPollSelectorProvider extends SelectorProviderImpl
  2. {
  3. public AbstractSelector openSelector() throws IOException {
  4. return new EPollSelectorImpl(this);
  5. }
  6. public Channel inheritedChannel() throws IOException {
  7. return InheritedChannel.getChannel();
  8. }
  9. }
  1. EPollSelectorImpl(SelectorProvider sp) throws IOException {
  2. super(sp);
  3. long pipeFds = IOUtil.makePipe(false);
  4. fd0 = (int) (pipeFds >>> 32);
  5. fd1 = (int) pipeFds;
  6. pollWrapper = new EPollArrayWrapper();
  7. pollWrapper.initInterrupt(fd0, fd1);
  8. fdToKey = new HashMap<>();
  9. }
  1. EPollArrayWrapper() throws IOException {
  2. // creates the epoll file descriptor
  3. epfd = epollCreate();
  4. // the epoll_event array passed to epoll_wait
  5. int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
  6. pollArray = new AllocatedNativeObject(allocationSize, true);
  7. pollArrayAddress = pollArray.address();
  8. // eventHigh needed when using file descriptors > 64k
  9. if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
  10. eventsHigh = new HashMap<>();
  11. }
  12. //native方法 是 epoll最核心的几个方法由linux操作系统实现
  13. private native int epollCreate(); //返回epoll文件描述符
  14. //epfd是epoll的文件描述符,对应Selector
  15. //fd是ServerSocketChannel的文件描述符
  16. private native void epollCtl(int epfd, int opcode, int fd, int events);
  17. private native int epollWait(long pollAddress, int numfds, long timeout, int epfd)
  18. throws IOException;

epoll_create(256) 返回文件描述符
image.png
epoll_create 打开一个epoll文件的描述符。c 语言创建的epoll的实例,就是结构体,用于存储数据。
epfd 是返回的文件描述符。
image.png

2.2 ssChannel.register(selector, SelectionKey.OP_ACCEPT);

sun.nio.ch.WindowsSelectorImpl#implRegister
sun.nio.ch.EPollSelectorImpl#implRegister

  1. protected void implRegister(SelectionKeyImpl ski) {
  2. if (closed)
  3. throw new ClosedSelectorException();
  4. SelChImpl ch = ski.channel;
  5. int fd = Integer.valueOf(ch.getFDVal());
  6. fdToKey.put(fd, ski);
  7. // fd 就是ServerSocketChannel的文件描述符
  8. pollWrapper.add(fd);
  9. keys.add(ski);
  10. }

2.3 selector.select()

sun.nio.ch.WindowsSelectorImpl#doSelect
sun.nio.ch.EPollSelectorImpl#doSelect

  1. protected int doSelect(long timeout) throws IOException {
  2. if (closed)
  3. throw new ClosedSelectorException();
  4. processDeregisterQueue();
  5. try {
  6. begin();
  7. // 轮询
  8. pollWrapper.poll(timeout);
  9. } finally {
  10. end();
  11. }
  12. processDeregisterQueue();
  13. int numKeysUpdated = updateSelectedKeys();
  14. if (pollWrapper.interrupted()) {
  15. // Clear the wakeup pipe
  16. pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
  17. synchronized (interruptLock) {
  18. pollWrapper.clearInterrupted();
  19. IOUtil.drain(fd0);
  20. interruptTriggered = false;
  21. }
  22. }
  23. return numKeysUpdated;
  24. }

sun.nio.ch.EPollArrayWrapper#poll

  1. int poll(long timeout) throws IOException {
  2. //epollCtl
  3. updateRegistrations();
  4. //epollWai 是一个nativea方法
  5. updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
  6. for (int i=0; i<updated; i++) {
  7. if (getDescriptor(i) == incomingInterruptFD) {
  8. interruptedIndex = i;
  9. interrupted = true;
  10. break;
  11. }
  12. }
  13. return updated;
  14. }
  15. private void updateRegistrations() {
  16. epollCtl(epfd, opcode, fd, events);
  17. }
  18. private native void epollCtl(int epfd, int opcode, int fd, int events);
  19. private native int epollWait(long pollAddress, int numfds, long timeout, int epfd)
  20. throws IOException;

3. select poll epoll的区别

image.png