分享

Python Websocket服务端

用Python做了个Websocket的服务端,虽然知道相关的库应该早就有了,但是还是想自己做出来。

当然也参考了不少网上的教程,琢磨了好久的Websocket协议的格式。

内含一个WebsocketServer类,可以实现握手、建立连接、心跳(ping)、传输数据、上下文管理器。可以接收任意类型的数据,发送字符串(稍微改一下也可以发送二进制数据),支持数据分片(传输和发送都可以)。

  1. # coding=utf-8  
  2. import socket  
  3. import time  
  4. import hashlib  
  5. import base64  
  6. import struct  
  7. from multiprocessing import Process  
  8.   
  9.   
  10. HTTP_RESPONSE = "HTTP/1.1 {code} {msg}\r\n"  \  
  11.                 "Server:LyricTool\r\n" \  
  12.                 "Date:{date}\r\n" \  
  13.                 "Content-Length:{length}\r\n" \  
  14.                 "\r\n" \  
  15.                 "{content}\r\n"  
  16. STATUS_CODE = {200: 'OK', 501: 'Not Implemented'}  
  17. UPGRADE_WS = "HTTP/1.1 101 Switching Protocols\r\n" \  
  18.              "Connection: Upgrade\r\n" \  
  19.              "Upgrade: websocket\r\n" \  
  20.              "Sec-WebSocket-Accept: {}\r\n" \  
  21.              "WebSocket-Protocol: chat\r\n\r\n"  
  22.   
  23.   
  24. def sec_key_gen(msg):  
  25.     key = msg + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'  
  26.     ser_key = hashlib.sha1(key.encode('utf-8')).digest()  
  27.     return base64.b64encode(ser_key).decode()  
  28.   
  29.   
  30. class WebsocketServer:  
  31.     def __init__(self, conn):  
  32.         # 接受一个socket对象  
  33.         self.conn = conn  
  34.         self.state = 0  
  35.   
  36.     def open(self):  
  37.         self._handshake()  
  38.         if self.state == 1:  
  39.             return self  
  40.         else:  
  41.             raise Exception('Handsake failed.')  
  42.   
  43.     def __enter__(self):  
  44.         return self.open()  
  45.   
  46.     def getstate(self):  
  47.         # 获取连接状态  
  48.         state_map = {0: 'READY', 1: 'CONNECTION ESTABLISHED', 2: 'HANDSHAKED', 3: 'FAILED', -1: 'CLOSED'}  
  49.         return self.state, state_map[self.state]  
  50.   
  51.     def _handshake(self):  
  52.         raw_data = b''  
  53.         while True:  
  54.             fragment = self.conn.recv(1024)  
  55.             raw_data += fragment  
  56.             if len(fragment) < 1024:  
  57.                 break  
  58.         data = raw_data.decode('utf-8')  
  59.         header, content = data.split('\r\n\r\n', 1)  
  60.         header = header.split('\r\n')  
  61.         options = map(lambda i: i.split(': '), header[1:])  
  62.         options_dict = {item[0]: item[1] for item in options}  
  63.         date = time.strftime("%m,%d%Y", time.localtime())  
  64.         if 'Sec-WebSocket-Key' not in options_dict:  
  65.             self.conn.send(  
  66.                 bytes(HTTP_RESPONSE.format(code=501, msg=STATUS_CODE[501], date=date, length=len(date), content=date),  
  67.                       encoding='utf-8'))  
  68.             self.conn.close()  
  69.             self.state = 3  
  70.             return True  
  71.         else:  
  72.             self.state = 2  
  73.             self._build(options_dict['Sec-WebSocket-Key'])  
  74.             return True  
  75.   
  76.     def _build(self, sec_key):  
  77.         # 建立WebSocket连接  
  78.         response = UPGRADE_WS.format(sec_key_gen(sec_key))  
  79.         self.conn.send(bytes(response, encoding='utf-8'))  
  80.         self.state = 1  
  81.         return True  
  82.   
  83.     def _get_data(self, info, setcode):  
  84.         payload_len = info[1] & 127  
  85.         fin = 1 if info[0] & 128 == 128 else 0  
  86.         opcode = info[0] & 15  # 提取opcode  
  87.   
  88.         # 提取载荷数据  
  89.         if payload_len == 126:  
  90.             # extend_payload_len = info[2:4]  
  91.             mask = info[4:8]  
  92.             decoded = info[8:]  
  93.         elif payload_len == 127:  
  94.             # extend_payload_len = info[2:10]  
  95.             mask = info[10:14]  
  96.             decoded = info[14:]  
  97.         else:  
  98.             # extend_payload_len = None  
  99.             mask = info[2:6]  
  100.             decoded = info[6:]  
  101.   
  102.         bytes_list = bytearray()  
  103.         for i in range(len(decoded)):  
  104.             chunk = decoded[i] ^ mask[i % 4]  
  105.             bytes_list.append(chunk)  
  106.         if opcode == 0x00:  
  107.             opcode = setcode  
  108.         if opcode == 0x01:   # 文本帧  
  109.             body = str(bytes_list, encoding='utf-8')  
  110.             return fin, opcode, body  
  111.         elif opcode == 0x08:  
  112.             self.close()  
  113.             raise IOError('Connection closed by Client.')  
  114.         else:  # 二进制帧或其他,原样返回  
  115.             body = decoded  
  116.             return fin, opcode, body  
  117.   
  118.     def recv(self):  
  119.         msg = ''  
  120.         # 处理切片  
  121.         opcode = 0x00  
  122.         while True:  
  123.             raw_data = b''  
  124.             while True:  
  125.                 section = self.conn.recv(1024)  
  126.                 raw_data += section  
  127.                 if len(section) < 1024:  
  128.                     break  
  129.             fin, _opcode, fragment = self._get_data(raw_data, opcode)  
  130.             opcode = _opcode if _opcode != 0x00 else opcode  
  131.             msg += fragment  
  132.             if fin == 1:   # 是否是最后一个分片  
  133.                 break  
  134.         return msg  
  135.   
  136.     def send(self, msg, fin=True):  
  137.         # 发送数据  
  138.         data = struct.pack('B', 129) if fin else struct.pack('B', 0)  
  139.         msg_len = len(msg)  
  140.         if msg_len <= 125:  
  141.             data += struct.pack('B', msg_len)  
  142.         elif msg_len <= (2**16 - 1):  
  143.             data += struct.pack('!BH', 126, msg_len)  
  144.         elif msg_len <= (2**64 - 1):  
  145.             data += struct.pack('!BQ', 127, msg_len)  
  146.         else:  
  147.             # 分片传输超大内容(应该用不到)  
  148.             while True:  
  149.                 fragment = msg[:(2**64 - 1)]  
  150.                 msg -= fragment  
  151.                 if msg > (2**64 - 1):  
  152.                     self.send(fragment, False)  
  153.                 else:  
  154.                     self.send(fragment)  
  155.         data += bytes(msg, encoding='utf-8')  
  156.         self.conn.send(data)  
  157.   
  158.     def ping(self):  
  159.         ping_msg = 0b10001001  
  160.         data = struct.pack('B', ping_msg)  
  161.         data += struct.pack('B', 0)  
  162.         while True:  
  163.             self.conn.send(data)  
  164.             data = self.conn.recv(1024)  
  165.             pong = data[0] & 127  
  166.             if pong != 9:  
  167.                 self.close()  
  168.                 raise IOError('Connection closed by Client.')  
  169.   
  170.     def close(self):  
  171.         self.conn.close()  
  172.         self.state = -1  
  173.   
  174.     def __exit__(self, exc_type, exc_val, exc_tb):  
  175.         if exc_type is IOError:  
  176.             print(exc_val)  
  177.         self.close()  
  178.   
  179.   
  180. def ws_handler(conn):  
  181.     with WebsocketServer(conn) as ws:  
  182.         while True:  
  183.             msg = ws.recv()  
  184.             if ws.state == -1:  
  185.                 break  
  186.             print(msg)  
  187.             ws.send(str(msg.split(',')))  
  188.   
  189.   
  190. if __name__ == '__main__':  
  191.     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
  192.     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
  193.     s.bind(('192.168.3.121', 8081))  
  194.     s.listen(1)  
  195.     print('Server Started.')  
  196.     while True:  
  197.         con, addr = s.accept()  
  198.         print("Accepted. {0}, {1}".format(con, str(addr)))  
  199.         p = Process(target=ws_handler, args=(con,))  
  200.         p.start()  
(5)

本文由 橙叶博客 作者:FrankGreg 发表,转载请注明来源!

热评文章

评论:

2 条评论,访客:2 条,博主:0 条
  1. avatar
    李嘉鹏发布于: 

    您好,您写的文章对我帮助非常大,实在感谢。现在使用您代码的时候遇到以下问题:
    1.process和websocket文件报错:File “E:\python\lib\multiprocessing\process.py”, line 297, in _bootstrap
    self.run()
    File “E:\python\lib\multiprocessing\process.py”, line 99, in run
    self._target(*self._args, **self._kwargs)
    File “C:\Users\jiape\PycharmProjects\get_HTTP\Websocket_server.py”, line 182, in ws_handler
    msg = ws.recv()
    File “C:\Users\jiape\PycharmProjects\get_HTTP\Websocket_server.py”, line 128, in recv
    fin, _opcode, fragment = self._get_data(raw_data, opcode)
    File “C:\Users\jiape\PycharmProjects\get_HTTP\Websocket_server.py”, line 112, in _get_data
    raise IOError(‘Connection closed by Client.’)
    OSError: Connection closed by Client.

    2.客户端报错:WebSocket connection to ‘ws://127.0.0.1:8081/’ failed: A server must not mask any frames that it sends to the client.

    请问有什么办法可以解决吗?

发表评论