1 返回一个特定的网页
from socket import *from threading import Threaddef serve_client(client_socket):    recv_data = client_socket.recv(1024)    print("客户端发送的请求:", recv_data)  # 5    print("回复客户端...")  # 6    reply = "HTTP/1.1 200 OK\n\n"\            "<h1>MacBookPro</h1>"    client_socket.send(reply.encode("utf-8"))    print("关闭套接字...")  # 7    client_socket.close()print("创建套接字...")  # 1tcp_socket = socket(AF_INET, SOCK_STREAM)# 程序退出端口号自动释放tcp_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)print("绑定端口...")  # 2tcp_socket.bind(("", 16055))print("变为监听套接字,只能被动接收...")  # 3tcp_socket.listen(128)while True:    print("接受客户端请求,分配一个客服套接字...")  # 4    client_socket, client_addr = tcp_socket.accept()    print("客户端信息:", client_addr)    sub_thread = Thread(target=serve_client, args=(client_socket,), daemon=True)    sub_thread.start()tcp_socket.close()

2 返回浏览器请求的html
from socket import *import redef serve_client(client_socket):    request = client_socket.recv(1024).decode("utf-8")    print("客户端发送的请求:", request)  # 5    # 读取请示中的文件名    request_lines = request.splitlines()    ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])    file_name = ""    if ret:        file_name = ret.group(1)        print("-------------", file_name)    # 设置默认界面    if file_name == "/" or file_name == "":        file_name = "/index.html"    # header是字符串,body是二进制,不能+=连在一起    print("回复客户端...")  # 6    try:        f = open("./html" + file_name, "rb")    except:  # 未找到文件        response = "HTTP/1.1 404 NOT FOUND\n\n"        response += "-----file not found-----"        client_socket.send(response.encode("utf-8"))    else:  # 成功        html_content = f.read()        f.close()        header = "HTTP/1.1 200 OK\n\n"        client_socket.send(header.encode("utf-8"))        client_socket.send(html_content)    print("关闭套接字...")  # 7    client_socket.close()print("创建套接字...")  # 1tcp_socket = socket(AF_INET, SOCK_STREAM)print("绑定端口...")  # 2tcp_socket.bind(("", 16055))print("变为监听套接字,只能被动接收...")  # 3tcp_socket.listen(128)while True:    print("接受客户端请求,分配一个客服套接字...")  # 4    client_socket, client_addr = tcp_socket.accept()    print("客户端信息:", client_addr)    serve_client(client_socket)tcp_socket.close()
3 多进程实现
from socket import *import refrom multiprocessing import Processdef serve_client(client_socket):    request = client_socket.recv(1024).decode("utf-8")    print("客户端发送的请求:", request)  # 5    # 读取请示中的文件名    request_lines = request.splitlines()    ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])    file_name = ""    if ret:        file_name = ret.group(1)        print("-------------", file_name)        # 设置默认界面        if file_name == "/":            file_name = "/index.html"    # header是字符串,body是二进制,不能+=连在一起    print("回复客户端...")  # 6    try:        f = open("./html" + file_name, "rb")    except:  # 未找到文件        response = "HTTP/1.1 404 NOT FOUND\n\n"        response += "-----file not found-----"        client_socket.send(response.encode("utf-8"))    else:  # 成功        html_content = f.read()        f.close()        header = "HTTP/1.1 200 OK\n\n"        client_socket.send(header.encode("utf-8"))        client_socket.send(html_content)    print("关闭套接字...")  # 7    client_socket.close()if __name__ == "__main__":    print("创建套接字...")  # 1    tcp_socket = socket(AF_INET, SOCK_STREAM)    print("绑定端口...")  # 2    tcp_socket.bind(("", 16054))    print("变为监听套接字,只能被动接收...")  # 3    tcp_socket.listen(128)    while True:        print("接受客户端请求,分配一个客服套接字...")  # 4        client_socket, client_addr = tcp_socket.accept()        print("客户端信息:", client_addr)        p = Process(target=serve_client, args=(client_socket,))        p.start()        client_socket.close()    tcp_socket.close()
4 多线程实现
from socket import *import refrom threading import Threaddef serve_client(client_socket):    request = client_socket.recv(1024).decode("utf-8")    print("客户端发送的请求:", request)  # 5    # 读取请示中的文件名    request_lines = request.splitlines()    ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])    file_name = ""    if ret:        file_name = ret.group(1)        print("-------------", file_name)        # 设置默认界面        if file_name == "/":            file_name = "/index.html"    # header是字符串,body是二进制,不能+=连在一起    print("回复客户端...")  # 6    try:        f = open("./html" + file_name, "rb")    except:  # 未找到文件        response = "HTTP/1.1 404 NOT FOUND\n\n"        response += "-----file not found-----"        client_socket.send(response.encode("utf-8"))    else:  # 成功        html_content = f.read()        f.close()        header = "HTTP/1.1 200 OK\n\n"        client_socket.send(header.encode("utf-8"))        client_socket.send(html_content)    print("关闭套接字...")  # 7    client_socket.close()if __name__ == "__main__":    print("创建套接字...")  # 1    tcp_socket = socket(AF_INET, SOCK_STREAM)    print("绑定端口...")  # 2    tcp_socket.bind(("", 16054))    print("变为监听套接字,只能被动接收...")  # 3    tcp_socket.listen(128)    while True:        print("接受客户端请求,分配一个客服套接字...")  # 4        client_socket, client_addr = tcp_socket.accept()        print("客户端信息:", client_addr)        p = Thread(target=serve_client, args=(client_socket,))        p.start()    tcp_socket.close()
5 gevent实现
from socket import *import reimport geventfrom gevent import monkeyimport timemonkey.patch_all()def serve_client(client_socket):    request = client_socket.recv(1024).decode("utf-8")    print("客户端发送的请求:", request)  # 5    # 读取请示中的文件名    request_lines = request.splitlines()    ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])    file_name = ""    if ret:        file_name = ret.group(1)        print("-------------", file_name)        # 设置默认界面        if file_name == "/":            file_name = "/index.html"    # header是字符串,body是二进制,不能+=连在一起    print("回复客户端...")  # 6    try:        f = open("./html" + file_name, "rb")    except:  # 未找到文件        response = "HTTP/1.1 404 NOT FOUND\n\n"        response += "-----file not found-----"        client_socket.send(response.encode("utf-8"))    else:  # 成功        html_content = f.read()        f.close()        header = "HTTP/1.1 200 OK\n\n"        client_socket.send(header.encode("utf-8"))        client_socket.send(html_content)    print("关闭套接字...")  # 7    client_socket.close()if __name__ == "__main__":    print("创建套接字...")  # 1    tcp_socket = socket(AF_INET, SOCK_STREAM)    tcp_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)    print("绑定端口...")  # 2    tcp_socket.bind(("", 7890))    print("变为监听套接字,只能被动接收...")  # 3    tcp_socket.listen(128)    while True:        print("接受客户端请求,分配一个客服套接字...")  # 4        client_socket, client_addr = tcp_socket.accept()        print("客户端信息:", client_addr)        gevent.spawn(serve_client, client_socket)        time.sleep(0.01)    tcp_socket.close()
6 面向对象实现
#!/usr/bin/env pythonfrom socket import *from threading import Threadimport sysclass HttpServer():    def __init__(self, port):        self.server = socket()        self.server.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)        self.server.bind(('', port))        self.server.listen()    def handle_request(self, conn: socket):        try:            data = conn.recv(4096)        except:            print('接收数据发生异常')            return        if len(data) == 0:            return        content = data.decode()        request_lines = content.split(' ', 2)        path = request_lines[1]        if path == '/':            path = '/index.html'        try:            with open('static'+path, 'rb') as f:                response_body = f.read()            response_line = 'HTTP/1.1 200 OK\n'            response_header = 'Server: CPALyth/1.1\n'        except:            with open('static/404.html', 'rb') as f:                response_body = f.read()            response_line = 'HTTP/1.1 404 Not Found\n'            response_header = 'Server: CPALyth/1.1\n'        response = (response_line + response_header + '\n').encode() + response_body        conn.send(response)    def start(self):        while True:            print('等待客户端...')            conn, addr = self.server.accept()            Thread(target=self.handle_request, args=(conn,), daemon=True).start()if __name__ == '__main__':    web_server = HttpServer(8000)    web_server.start()
7 实现上传图片
(1) 服务端
#!/usr/bin/env pythonfrom socket import *from threading import Threadimport reimport base64class HttpServer():    def __init__(self, port):        self.server = socket()        self.server.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)        self.server.bind(('', port))        self.server.listen()    def handle_request(self, conn: socket):        while True:            print('正在等待客户端发送消息...')            try:                data = conn.recv(4096)            except:                data = ''            if len(data) == 0:                conn.close()                return            content = data.decode()            request_line = content.split(' ', 2)            method = request_line[0]            if method == 'GET':  # 获取服务器资源                path = request_line[1]                self.get(conn, path)            elif method == 'POST':  # 上传文件到服务器                ret = re.search(r'Content-type: (.+)\n', content)                content_type = ret.group(1) if ret else None                ret = re.search(r'\n\n(.+)', content)                body = ret.group(1) if ret else None                self.post(conn, content_type, body)    def get(self, conn, path):        if path == '/':            path = '/index.html'        try:            with open('static' + path, 'r', encoding='utf8') as f:                response_body = f.read()            response_line = 'HTTP/1.1 200 OK\n'            response_header = 'Server: CPALyth/1.1\n'        except:            with open('static/404.html', 'r', encoding='utf8') as f:                response_body = f.read()            response_line = 'HTTP/1.1 404 Not Found\n'            response_header = 'Server: CPALyth/1.1\n'        response = (response_line + response_header + '\n' + response_body).encode()        conn.send(response)    def post(self, conn, content_type, body):        if content_type == 'images/png':            body_b64 = body.encode()            img_data = base64.b64decode(body_b64)            with open('out.png', 'wb') as f:                f.write(img_data)        response = ('HTTP/1.1 200 OK\n' + 'Server: CPALyth/1.1\n' + '\n' + '上传成功').encode()        conn.send(response)    def start(self):        while True:            print('等待客户端...')            conn, addr = self.server.accept()            Thread(target=self.handle_request, args=(conn,), daemon=True).start()if __name__ == '__main__':    web_server = HttpServer(8000)    web_server.start()
(2) 客户端
from socket import *import base64class HttpClient():    def __init__(self):        self.conn = socket()        self.conn.connect(('192.168.3.67', 8000))    def get_index(self):        request_line = 'GET / HTTP/1.1\n'        request_header = 'User-Agent: Python\n'        request_body = ''        request = (request_line + request_header + '\n' + request_body).encode()        self.conn.send(request)        response = self.conn.recv(4096).decode()        print(response)    def upload_pic(self):        request_line = 'POST / HTTP/1.1\n'        request_header = 'Content-type: images/png\n'        # 图片先读取bytes, 再编码为base64, 再解码为utf8        with open('1.png', 'rb') as f:            request_body = base64.b64encode(f.read()).decode()        request = (request_line + request_header + '\n' + request_body).encode()        self.conn.send(request)        response = self.conn.recv(4096).decode()        print(response)cli = HttpClient()# cli.get_index()cli.upload_pic()