Python Websocket服务端

Posted by 橙叶 on Sat, Mar 16, 2019

用Python做了个Websocket的服务端,虽然知道相关的库应该早就有了,但是还是想自己做出来。

当然也参考了不少网上的教程,琢磨了好久的Websocket协议的格式。

内含一个WebsocketServer类,可以实现握手、建立连接、心跳(ping)、传输数据、上下文管理器。可以接收任意类型的数据,发送字符串(稍微改一下也可以发送二进制数据),支持数据分片(传输和发送都可以)。

贴到了GitHub上:https://github.com/OGRLEAF/lyricstools/blob/master/WebSocketServer.py,解决了关闭时仍然返回1006错误码的问题(Socket连接未完全关闭)下面的代码不再更新了,其实看看就行,就是个玩具。

  1. # coding=utf-8
  2. import socket
  3. import time
  4. import hashlib
  5. import base64
  6. import struct
  7. from multiprocessing import Process
  8. HTTP_RESPONSE = "HTTP/1.1 {code} {msg}\r\n"  \
  9.                 "Server:LyricTool\r\n" \
  10.                 "Date:{date}\r\n" \
  11.                 "Content-Length:{length}\r\n" \
  12.                 "\r\n" \
  13.                 "{content}\r\n"
  14. STATUS_CODE = {200: 'OK', 501: 'Not Implemented'}
  15. UPGRADE_WS = "HTTP/1.1 101 Switching Protocols\r\n" \
  16.              "Connection: Upgrade\r\n" \
  17.              "Upgrade: websocket\r\n" \
  18.              "Sec-WebSocket-Accept: {}\r\n" \
  19.              "WebSocket-Protocol: chat\r\n\r\n"
  20. def sec_key_gen(msg):
  21.     key = msg + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
  22.     ser_key = hashlib.sha1(key.encode('utf-8')).digest()
  23.     return base64.b64encode(ser_key).decode()
  24. class WebsocketServer:
  25.     def __init__(self, conn):
  26.         # 接受一个socket对象
  27.         self.conn = conn
  28.         self.state = 0
  29.     def open(self):
  30.         self._handshake()
  31.         if self.state == 1:
  32.             return self
  33.         else:
  34.             raise Exception('Handsake failed.')
  35.     def __enter__(self):
  36.         return self.open()
  37.     def getstate(self):
  38.         # 获取连接状态
  39.         state_map = {0: 'READY', 1: 'CONNECTION ESTABLISHED', 2: 'HANDSHAKED', 3: 'FAILED', -1: 'CLOSED'}
  40.         return self.state, state_map[self.state]
  41.     def _handshake(self):
  42.         raw_data = b''
  43.         while True:
  44.             fragment = self.conn.recv(1024)
  45.             raw_data += fragment
  46.             if len(fragment) < 1024:
  47.                 break
  48.         data = raw_data.decode('utf-8')
  49.         header, content = data.split('\r\n\r\n', 1)
  50.         header = header.split('\r\n')
  51.         options = map(lambda i: i.split(': '), header[1:])
  52.         options_dict = {item[0]: item[1] for item in options}
  53.         date = time.strftime("%m,%d%Y", time.localtime())
  54.         if 'Sec-WebSocket-Key' not in options_dict:
  55.             self.conn.send(
  56.                 bytes(HTTP_RESPONSE.format(code=501, msg=STATUS_CODE[501], date=date, length=len(date), content=date),
  57.                       encoding='utf-8'))
  58.             self.conn.close()
  59.             self.state = 3
  60.             return True
  61.         else:
  62.             self.state = 2
  63.             self._build(options_dict['Sec-WebSocket-Key'])
  64.             return True
  65.     def _build(self, sec_key):
  66.         # 建立WebSocket连接
  67.         response = UPGRADE_WS.format(sec_key_gen(sec_key))
  68.         self.conn.send(bytes(response, encoding='utf-8'))
  69.         self.state = 1
  70.         return True
  71.     def _get_data(self, info, setcode):
  72.         payload_len = info[1] & 127
  73.         fin = 1 if info[0] & 128 == 128 else 0
  74.         opcode = info[0] & 15  # 提取opcode
  75.         # 提取载荷数据
  76.         if payload_len == 126:
  77.             # extend_payload_len = info[2:4]
  78.             mask = info[4:8]
  79.             decoded = info[8:]
  80.         elif payload_len == 127:
  81.             # extend_payload_len = info[2:10]
  82.             mask = info[10:14]
  83.             decoded = info[14:]
  84.         else:
  85.             # extend_payload_len = None
  86.             mask = info[2:6]
  87.             decoded = info[6:]
  88.         bytes_list = bytearray()
  89.         for i in range(len(decoded)):
  90.             chunk = decoded[i] ^ mask[i % 4]
  91.             bytes_list.append(chunk)
  92.         if opcode == 0x00:
  93.             opcode = setcode
  94.         if opcode == 0x01:   # 文本帧
  95.             body = str(bytes_list, encoding='utf-8')
  96.             return fin, opcode, body
  97.         elif opcode == 0x08:
  98.             self.close()
  99.             raise IOError('Connection closed by Client.')
  100.         else:  # 二进制帧或其他,原样返回
  101.             body = decoded
  102.             return fin, opcode, body
  103.     def recv(self):
  104.         msg = ''
  105.         # 处理切片
  106.         opcode = 0x00
  107.         while True:
  108.             raw_data = b''
  109.             while True:
  110.                 section = self.conn.recv(1024)
  111.                 raw_data += section
  112.                 if len(section) < 1024:
  113.                     break
  114.             fin, _opcode, fragment = self._get_data(raw_data, opcode)
  115.             opcode = _opcode if _opcode != 0x00 else opcode
  116.             msg += fragment
  117.             if fin == 1:   # 是否是最后一个分片
  118.                 break
  119.         return msg
  120.     def send(self, msg, fin=True):
  121.         # 发送数据
  122.         data = struct.pack('B', 129) if fin else struct.pack('B', 0)
  123.         msg_len = len(msg)
  124.         if msg_len <= 125:
  125.             data += struct.pack('B', msg_len)
  126.         elif msg_len <= (2**16 - 1):
  127.             data += struct.pack('!BH', 126, msg_len)
  128.         elif msg_len <= (2**64 - 1):
  129.             data += struct.pack('!BQ', 127, msg_len)
  130.         else:
  131.             # 分片传输超大内容(应该用不到)
  132.             while True:
  133.                 fragment = msg[:(2**64 - 1)]
  134.                 msg -= fragment
  135.                 if msg > (2**64 - 1):
  136.                     self.send(fragment, False)
  137.                 else:
  138.                     self.send(fragment)
  139.         data += bytes(msg, encoding='utf-8')
  140.         self.conn.send(data)
  141.     def ping(self):
  142.         ping_msg = 0b10001001
  143.         data = struct.pack('B', ping_msg)
  144.         data += struct.pack('B', 0)
  145.         while True:
  146.             self.conn.send(data)
  147.             data = self.conn.recv(1024)
  148.             pong = data[0] & 127
  149.             if pong != 9:
  150.                 self.close()
  151.                 raise IOError('Connection closed by Client.')
  152.     def close(self):
  153.         self.conn.close()
  154.         self.state = -1
  155.     def __exit__(self, exc_type, exc_val, exc_tb):
  156.         if exc_type is IOError:
  157.             print(exc_val)
  158.         self.close()
  159. def ws_handler(conn):
  160.     with WebsocketServer(conn) as ws:
  161.         while True:
  162.             msg = ws.recv()
  163.             if ws.state == -1:
  164.                 break
  165.             print(msg)
  166.             ws.send(str(msg.split(',')))
  167. if __name__ == '__main__':
  168.     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  169.     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  170.     s.bind(('192.168.3.121', 8081))
  171.     s.listen(1)
  172.     print('Server Started.')
  173.     while True:
  174.         con, addr = s.accept()
  175.         print("Accepted. {0}, {1}".format(con, str(addr)))
  176.         p = Process(target=ws_handler, args=(con,))
  177.         p.start()


comments powered by Disqus