TCP(Transmission Control Protocol),即传输控制协议。是一种面向连接的、可靠的、基于字节流的传输层通信协议。不同于UDP,TCP更像是提供一种可靠的、像管道一样的连接。
Java中的TCP主要涉及ServerSocket和Socket两个类。前者被认为是服务端的一个实体,用于接受连接。后者则被认为是连接的一种封装,用于传输数据,类似于一个管道。

v2-1457b3a97628fb108b41d7b7d5a0eabc_1440w.jpg

假设我现在要写一个程序,给另一台计算机发数据,必须通过tcp/ip协议 ,但具体的实现过程是什么呢?我应该怎么操作才能把数据封装成tcp/ip的包,又执行什么指令才能把数据发到对端机器上呢? 不能只有世界观,没有方法论呀。。。此时,socket隆重登场,简而言之,socket这个东东干的事情,就是帮你把tcp/ip协议层的各种数据封装啦、数据发送、接收等通过代码已经给你封装好了,你只需要调用几行代码,就可以给别的机器发消息了。

Socket基础知识概括

什么是socket?

socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部。
v2-66e689978267d0bdbde41c183b063b2b_r.jpg

socket起源于Unix,而Unix/Linux 基本哲学之一就是“一切皆文件”,都可以用“打开open –> 读写write/read –> 关闭close”模式 来操作。Socket就是该模式的一个实现,socket即是一种特殊的文件,一些socket函数就是对其进行的操作(读/写IO、打开、关闭)

你想给另一台计算机发消息,你知道他的IP地址,他的机器上同时运行着qq、迅雷、word、浏览器等程序,你想给他的qq发消息,那想一下,你现在只能通过ip找到他的机器,但如果让这台机器知道把消息发给qq程序呢?答案就是通过port,一个机器上可以有0-65535个端口,你的程序想从网络上收发数据,就必须绑定一个端口,这样,远程发到这个端口上的数据,就全会转给这个程序啦

v2-123aeb78d4a367410d0ee0156b5942ae_r.jpg

Socket通信套路(人脑思维)

当通过socket建立起2台机器的连接后,本质上socket只干2件事,一是收数据,一是发数据,没数据时就等着。

socket 建立连接的过程跟我们现实中打电话比较像,打电话必须是打电话方和接电话方共同完成的事情,我们分别看看他们是怎么建立起通话的

接电话方:

  1. 首先你得有个电话
  2. 你的电话要有号码
  3. 你的电话必须连上电话线
  4. 开始在家等电话
  5. 电话铃响了,接起电话,听到对方的声音

打电话方:

  1. 首先你得有个电话
  2. 输入你想拨打的电话
  3. 等待对方接听
  4. say“hi 约么,我有七天酒店的打折卡噢~”
  5. 等待回应——》响应回应——》等待回应。。。。

把它翻译成scoket通信

接电话方(socket服务器端):

  1. 首先你得有个电话(生成ServerSocket 对象)
  2. 你的电话要有号码(绑定本机ip+port)
  3. 你的电话必须连上电话线(连网)
  4. 开始在家等电话(accept())
  5. 电话铃响了,接起电话,听到对方的声音(接受新连接)

打电话方(socket客户端):

  1. 首先你得有个电话(生成socket对象)
  2. 输入你想拨打的电话(connect 远程主机ip+port)
  3. 等待对方接听
  4. say“hi 约么,我有七天酒店的打折卡噢~”(发消息。。。)
  5. 等待回应——》响应回应——》等待回应。。。。

v2-25e3ac927206fcd43f1091fd62b0c779_r.jpg

ServerSocket(服务器)

ServerSocket 类是与 Socket 类相对应的用于表示通信双方中的服务器端,用于在服务器上开一个端口,被动地等待数据(使用 accept() 方法)并建立连接进行数据交互。

ServerSocket 的构造方法

ServerSocket 的构造方法如下所示。

  • ServerSocket():无参构造方法。
  • ServerSocket(int port):创建绑定到特定端口的服务器套接字。
  • ServerSocket(int port,int backlog):使用指定的 backlog 创建服务器套接字并将其绑定到指定的本地端口。
  • ServerSocket(int port,int backlog,InetAddress bindAddr):使用指定的端口、监听 backlog 和要绑定到本地的 IP 地址创建服务器。

在上述方法的参数中 port 指的是本地 TCP 端口,backlog 指的是监听 backlog,bindAddr 指的是要将服务器绑定到的 InetAddress。

ServerSocket 的常用方法

ServerSocket 的常用方法如下所示。

  • Server accept():监听并接收到此套接字的连接。
  • void bind(SocketAddress endpoint):将 ServerSocket 绑定到指定地址(IP 地址和端口号)。
  • void close():关闭此套接字。
  • InetAddress getInetAddress():返回此服务器套接字的本地地址。
  • int getLocalPort():返回此套接字监听的端口。
  • SocketAddress getLocalSoclcetAddress():返回此套接字绑定的端口的地址,如果尚未绑定则返回 null。
  • int getReceiveBufferSize():获取此 ServerSocket 的 SO_RCVBUF 选项的值,该值是从 ServerSocket 接收的套接字的建议缓冲区大小。

调用 accept() 方法会返回一个和客户端 Socket 对象相连接的 Socket 对象,服务器端的 Socket 对象使用 getOutputStream() 方法获得的输出流将指定客户端 Socket 对象使用 getInputStream() 方法获得那个输入流。同样,服务器端的 Socket 对象使用的 getInputStream() 方法获得的输入流将指向客户端 Socket 对象使用的 getOutputStream() 方法获得的那个输出流。也就是说,当服务器向输出流写入信息时,客户端通过相应的输入流就能读取,反之同样如此。

Socket (客户端)

Socket 类表示通信双方中的客户端,用于呼叫远端机器上的一个端口,主动向服务器端发送数据(当连接建立后也能接收数据)。

Socket 的构造方法

Socket的构造方法如下所示。

  • Socket():无参构造方法。
  • Socket(InetAddress address,int port):创建一个流套接字并将其连接到指定 IP 地址的指定端口。
  • Soclcet(InetAddress address,int port,InetAddress localAddr,int localPort):创建一个套接字并将其连接到指定远程地址上的指定远程端口。
  • Socket(String host,int port):创建一个流套接字并将其连接到指定主机上的指定端口。
  • Socket(String host,int port,InetAddress localAddr,int localPort):创建一个套接字并将其连接到指定远程地址上的指定远程端口。Socket 会通过调用 bind() 函数来绑定提供的本地地址及端口。

在上述方法的参数中,address 指的是远程地址,port 指的是远程端口,localAddr 指的是要将套接字绑定到的本地地址,localPort 指的是要将套接字绑定到的本地端口。

Socket 的常用方法

Socket的常用方法如下所示。

  • void bind(SocketAddress bindpoint):将套接字绑定到本地地址。
  • void close():关闭此套接字。
  • void connect(SocketAddress endpoint):将此套接字连接到服务器。
  • InetAddress getInetAddress():返回套接字的连接地址。
  • InetAddress getLocalAddress():获取套接字绑定的本地地址。
  • InputStream getInputStream():返回此套接字的输入流。
  • OutputStream getOutputStream():返回此套接字的输出流。
  • SocketAddress getLocalSocketAddress():返回此套接字绑定的端点地址,如果尚未绑定则返回 null。
  • SocketAddress getRemoteSocketAddress():返回此套接字的连接的端点地址,如果尚未连接则返回 null。
  • int getLoacalPort():返回此套接字绑定的本地端口。
  • intgetPort():返回此套接字连接的远程端口。

    Spring Boot 搭建TCP Socket

    业务场景是模拟室温体温传感器发送环境温度和体温数据通过网关传送到服务器,服务器接收消息
    使用的技术栈:
    springboot + socket +(并发场景+线程池)

maven依赖导入

  1. <!--字节转换-->
  2. <dependency>
  3. <groupId>commons-codec</groupId>
  4. <artifactId>commons-codec</artifactId>
  5. <version>1.12</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.projectlombok</groupId>
  9. <artifactId>lombok</artifactId>
  10. <version>1.16.20</version>
  11. <scope>provided</scope>
  12. </dependency>

首先启动服务端socket,创建 SocketServer

  1. @Slf4j
  2. @Data
  3. @Component
  4. @NoArgsConstructor
  5. public class SocketServer {
  6. private Integer port = 9090;
  7. private boolean started;
  8. private ServerSocket serverSocket;
  9. // 防止重复创建socket线程链接对象浪费资源
  10. private ExecutorService executorService = Executors.newCachedThreadPool();
  11. public void start(){
  12. start(null);
  13. }
  14. public void start(Integer port){
  15. log.info("port: {}, {}", this.port, port);
  16. try {
  17. serverSocket = new ServerSocket(port == null ? this.port : port);
  18. started = true;
  19. log.info("Socket服务已启动,占用端口: {}", serverSocket.getLocalPort());
  20. } catch (IOException e) {
  21. log.error("端口冲突,异常信息:{}", e);
  22. System.exit(0);
  23. }
  24. while (true){
  25. try {
  26. // 开启socket监听
  27. Socket socket = serverSocket.accept();
  28. ClientSocket register = register(socket);
  29. // 在此判断是否重复创建socket对象线程
  30. if (register != null){
  31. executorService.submit(register);
  32. }
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. }

该类主要是启动socket监听端口,并开启监听创建ClientSocket对象

创建全局Map存放现以链接服务器的socket SocketPool

public class SocketPool {

    private static final ConcurrentHashMap<String, ClientSocket> ONLINE_SOCKET_MAP = new ConcurrentHashMap<>();


    public static void add(ClientSocket clientSocket){
        if (clientSocket != null && !clientSocket.getKey().isEmpty())
            ONLINE_SOCKET_MAP.put(clientSocket.getKey(), clientSocket);
    }

    public static void remove(String key){
        if (!key.isEmpty())
            ONLINE_SOCKET_MAP.remove(key);
    }
}

封装socket对象 ClientSocket 每个链接都是一个ClientSocket对象.我们可以在此类中和客户端进行收到等操作达到监听效果.也可以将收到的数据保存到数据库中,以便后续业务需求.

@Slf4j
@Data
public class ClientSocket implements Runnable {

    private Socket socket;
    private DataInputStream inputStream;
    private DataOutputStream outputStream;
    private String key;
    private String message;

    @Override
    public void run() {
        while (true){
            try {
                onMessage(this);
                log.info(LocalDateTime.now()+"当前设备:"+this.key+" 接收到数据: <<<<<<" + this.message);
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (isSocketClosed(this)){
                log.info("客户端已关闭,其Key值为:{}", this.getKey());
                //关闭对应的服务端资源
                close(this);
                break;
            }
        }
    }
}

创建socket任务处理器 SocketHandler,里面封装读写数据,销毁链接,等方法.

@Slf4j
public class SocketHandler {

    /**
     * 将连接的Socket注册到Socket池中
     * @param socket
     * @return
     */
    public static ClientSocket register(Socket socket){
        ClientSocket clientSocket = new ClientSocket();
        clientSocket.setSocket(socket);
        try {
            clientSocket.setInputStream(new DataInputStream(socket.getInputStream()));
            clientSocket.setOutputStream(new DataOutputStream(socket.getOutputStream()));
            byte[] bytes = new byte[1024];
            clientSocket.getInputStream().read(bytes);
            clientSocket.setKey(new String(bytes, "utf-8"));
            add(clientSocket);
            return clientSocket;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 向指定客户端发送信息
     * @param clientSocket
     * @param message
     */

    public static void sendMessage(ClientSocket clientSocket, String message){
        try {
            log.info("发送消息到客户端  : >>>>>" + message);
            clientSocket.getOutputStream().write(message.getBytes("utf-8"));
            //clientSocket.getOutputStream().writeUTF(message);
        } catch (IOException e) {
            log.error("发送信息异常:{}", e);
            close(clientSocket);
        }
    }

    /**
     * 获取指定客户端的上传信息
     * @param clientSocket
     * @return
     */
    public static void onMessage(ClientSocket clientSocket){
        byte[] keyByte = new byte[1024];
        byte[] msgByte = new byte[1];
        try {
            // 第一次先发送序列号
            if(StringUtils.isEmpty(clientSocket.getKey())) {
                clientSocket.getInputStream().read(keyByte);
                clientSocket.setKey(new String(keyByte, "UTF-8"));
                //return clientSocket.getKey();
                // 以后发送数据
            }else {
                String info = "";
                while (true) {
                    if (clientSocket.getInputStream().available() > 0) {
                        clientSocket.getInputStream().read(msgByte);
                        String tempStr = HexEcodeUtil.ByteArrayToHexStr(msgByte);
                        info += tempStr;
                        //已经读完
                        if (clientSocket.getInputStream().available() == 0) {
                            //重置,不然每次收到的数据都会累加起来
                            clientSocket.setMessage(info);
                            break;
                        }
                    }
                }
                //return clientSocket.getMessage();
            }
        } catch (IOException e) {
            e.printStackTrace();
            close(clientSocket);
        }
        //return null;
    }

    /**
     * 指定Socket资源回收
     * @param clientSocket
     */
    public static void close(ClientSocket clientSocket){
        log.info("进行资源回收");
        if (clientSocket != null){
            log.info("开始回收socket相关资源,其Key为{}", clientSocket.getKey());
            remove(clientSocket.getKey());
            Socket socket = clientSocket.getSocket();
            try {
                socket.shutdownInput();
                socket.shutdownOutput();
            } catch (IOException e) {
                log.error("关闭输入输出流异常,{}", e);
            }finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    log.error("关闭socket异常{}", e);
                }
            }
        }
    }


    /**
     * 发送数据包,判断数据连接状态
     * @param clientSocket
     * @return
     */
    public static boolean isSocketClosed(ClientSocket clientSocket){
        try {
            clientSocket.getSocket().sendUrgentData(1);
            return false;
        } catch (IOException e) {
            return true;
        }
    }
}

最后一步将socket随spring容器启动类一同启动


public static void main(String[] args) {
        ApplicationContext run = SpringApplication.run(CollectionApplication.class, args);
        run.getBean(SocketServer.class).start();

    }

至此demo基本完成,有人会奇怪前面pom导入的ommons-codec干嘛用的,这是遇到的坑,因为网关设备推送的数据是16进制码,String在读取数据转码的时候会乱码,而设备序列号却不会乱码….一度困扰我不知道怎么回事,后来查资料反应过来可能是编码的问题,于是又写了个转码的util(如果数据包不是16进制码完全用不到ommons-codec包)

util

/**
 * 16进制转换工具
 */
@Slf4j
public class HexEcodeUtil {

    //16进制数字字符集
    public static final String HEXMAXSTRING = "0123456789ABCDEF";
    public static final String HEXMINSTRING = "0123456789abcdef";
    /**
     * byte[]转16进制Str
     *
     * @param byteArray
     */
    public static String ByteArrayToHexStr(byte[] byteArray){
        if (byteArray == null)
            return null;
        char[] hexArray = HEXMAXSTRING.toCharArray();
        char[] hexChars = new char[byteArray.length * 2];
        for (int i = 0; i < byteArray.length; i++){
            int temp = byteArray[i] & 0xFF;
            hexChars[i * 2] = hexArray[temp >>> 4];
            hexChars[i * 2 + 1] = hexArray[temp & 0x0F];
        }
        return new String(hexChars);
    }

    /**
     * Str转16进制Str
     *
     * @param str
     * @return
     */
    public static String StrToHexStr(String str) {
        //根据默认编码获取字节数组
        byte[] bytes = str.getBytes();
        StringBuilder stringBuilder = new StringBuilder(bytes.length * 2);
        //将字节数组中每个字节拆解成2位16进制整数
        for (int i = 0; i < bytes.length; i++){
            stringBuilder.append("0x");
            stringBuilder.append(HEXMAXSTRING.charAt((bytes[i] & 0xf0) >> 4));
            stringBuilder.append(HEXMAXSTRING.charAt((bytes[i] & 0x0f) >> 0));
            //去掉末尾的逗号
            if (i != bytes.length - 1)
                stringBuilder.append(",");
        }
        return stringBuilder.toString();
    }

    /**
     * 16进制Str转byte[]
     *
     * @param hexStr 不带空格、不带0x、不带逗号的16进制Str,如:06EEF7F1
     * @return
     */
    public static byte[] HexStrToByteArray(String hexStr){
        byte[] byteArray = new byte[hexStr.length() / 2];
        for (int i = 0; i < byteArray.length; i++){
            String subStr = hexStr.substring(2 * i, 2 * i + 2);
            byteArray[i] = ((byte) Integer.parseInt(subStr, 16));
        }
        return byteArray;
    }

    /**
     * 温度湿度数据转换
     *
     */
    public static Map<String , String> HexToRead(String info){
        HashMap<String, String> hashMap = new HashMap<>();

        return hashMap;
    }

    /**
     * 将16进制字符串转换为byte数组
     * @param hexItr 16进制字符串
     * @return
     */
    public static byte[] hexItr2Arr(String hexItr) {
     /*   try {
            *//*return Hex.decodeHex(hexItr);*//*
        } catch (DecoderException e) {
            log.info("16进制字符串转byte异常!");
            e.printStackTrace();
        }
        return null;*/
        return null;
    }
    /**
     * 16进制转换成为string类型字符串
     * @param s
     * @return
     */
    public static String hexStringToString(String s) {
        if (s == null || s.equals("")) {
            return null;
        }
        s = s.replace(" ", "");
        byte[] baKeyword = new byte[s.length() / 2];
        for (int i = 0; i < baKeyword.length; i++) {
            try {
                baKeyword[i] = (byte) (0xff & Integer.parseInt(s.substring(i * 2, i * 2 + 2), 16));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        try {
            s = new String(baKeyword, "UTF-8");
            new String();
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        return s;
    }
    /**
     * 16进制直接转换成为字符串(无需Unicode解码)
     * @param hexStr
     * @return
     */
    public static String hexStr2Str(String hexStr) {
        String str = "0123456789ABCDEF";
        char[] hexs = hexStr.toCharArray();
        byte[] bytes = new byte[hexStr.length() / 2];
        int n;
        for (int i = 0; i < bytes.length; i++) {
            n = str.indexOf(hexs[2 * i]) * 16;
            n += str.indexOf(hexs[2 * i + 1]);
            bytes[i] = (byte) (n & 0xff);
        }
        return new String(bytes);

    }
    public static void main(String[] args) {
        //byte[] bytes = hexItr2Arr("454E383739465134563249393936394F");
        byte[] bytes = hexItr2Arr("010300000002C40B010304012202585B5F");
        try {
            String s = new String(bytes, "UTF-8");
            log.info(s);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}