需要在配置文件内定义

    1. [vip.ws]
    2. host = "ws.ebonex.vip"
    3. agreement = "wss"
    4. api = [['api', 'wss']]

    api

    1. {
    2. "id": "quote",
    3. "path": "/ws/quote/v1?lang=zh-cn",
    4. "title": "行情"
    5. }

    case

    1. class TestCase(TestBase):
    2. def test_01(self, data_conversion, data):
    3. topic, outs = data_conversion.get('topic', 'outs')
    4. client = self.settings.ws.get_client("quote", *topic, on_pong=self.on_pong)
    5. client.run_forever(ping_interval=2, ping_payload=json.dumps({"ping": int(time.time() * 1000)}))
    6. def on_pong(self, ws: websocket.WebSocketApp, message):
    7. self.logger.debug("收到pong消息:{}".format(message))
    8. if time.time() - ws.open_start_time > 20:
    9. ws.close()

    data(topic)

    1. [
    2. {
    3. "__file": "test_quote",
    4. "__func": "test_01",
    5. "__dependent_class": null
    6. },
    7. {
    8. "title": "行情",
    9. "tags": ["au", "debug"],
    10. "topic": [
    11. {"id":"BTCAUD","topic":"klineV2_5m","symbol":"BTCAUD","params":{"org":9001,"binary":false},"event":"sub"}
    12. ],
    13. "outs": {
    14. }
    15. }
    16. ]

    get_client入参

    1. def get_client(self, api_id, *topics, header=None, **kwargs) -> websocket.WebSocketApp:
    2. """
    3. 获取客户端
    4. api_id:
    5. topics: topics列表
    6. header
    7. """

    websocket采用的是websocket.WebSocketApp,相关方法及代码

    1. class WebSocketApp:
    2. """
    3. Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
    4. """
    5. def __init__(self, url, header=None,
    6. on_open=None, on_message=None, on_error=None,
    7. on_close=None, on_ping=None, on_pong=None,
    8. on_cont_message=None,
    9. keep_running=True, get_mask_key=None, cookie=None,
    10. subprotocols=None,
    11. on_data=None):
    12. """
    13. WebSocketApp initialization
    14. Parameters
    15. ----------
    16. url: str
    17. Websocket url.
    18. header: list or dict
    19. Custom header for websocket handshake.
    20. on_open: function
    21. Callback object which is called at opening websocket.
    22. on_open has one argument.
    23. The 1st argument is this class object.
    24. on_message: function
    25. Callback object which is called when received data.
    26. on_message has 2 arguments.
    27. The 1st argument is this class object.
    28. The 2nd argument is utf-8 data received from the server.
    29. on_error: function
    30. Callback object which is called when we get error.
    31. on_error has 2 arguments.
    32. The 1st argument is this class object.
    33. The 2nd argument is exception object.
    34. on_close: function
    35. Callback object which is called when connection is closed.
    36. on_close has 3 arguments.
    37. The 1st argument is this class object.
    38. The 2nd argument is close_status_code.
    39. The 3rd argument is close_msg.
    40. on_cont_message: function
    41. Callback object which is called when a continuation
    42. frame is received.
    43. on_cont_message has 3 arguments.
    44. The 1st argument is this class object.
    45. The 2nd argument is utf-8 string which we get from the server.
    46. The 3rd argument is continue flag. if 0, the data continue
    47. to next frame data
    48. on_data: function
    49. Callback object which is called when a message received.
    50. This is called before on_message or on_cont_message,
    51. and then on_message or on_cont_message is called.
    52. on_data has 4 argument.
    53. The 1st argument is this class object.
    54. The 2nd argument is utf-8 string which we get from the server.
    55. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
    56. The 4th argument is continue flag. If 0, the data continue
    57. keep_running: bool
    58. This parameter is obsolete and ignored.
    59. get_mask_key: function
    60. A callable function to get new mask keys, see the
    61. WebSocket.set_mask_key's docstring for more information.
    62. cookie: str
    63. Cookie value.
    64. subprotocols: list
    65. List of available sub protocols. Default is None.
    66. """
    67. def send(self, data, opcode=ABNF.OPCODE_TEXT):
    68. """
    69. send message
    70. Parameters
    71. ----------
    72. data: str
    73. Message to send. If you set opcode to OPCODE_TEXT,
    74. data must be utf-8 string or unicode.
    75. opcode: int
    76. Operation code of data. Default is OPCODE_TEXT.
    77. """
    78. def close(self, **kwargs):
    79. """
    80. Close websocket connection.
    81. """
    82. def run_forever(self, sockopt=None, sslopt=None,
    83. ping_interval=0, ping_timeout=None,
    84. ping_payload="",
    85. http_proxy_host=None, http_proxy_port=None,
    86. http_no_proxy=None, http_proxy_auth=None,
    87. skip_utf8_validation=False,
    88. host=None, origin=None, dispatcher=None,
    89. suppress_origin=False, proxy_type=None):
    90. """
    91. Run event loop for WebSocket framework.
    92. This loop is an infinite loop and is alive while websocket is available.
    93. Parameters
    94. ----------
    95. sockopt: tuple
    96. Values for socket.setsockopt.
    97. sockopt must be tuple
    98. and each element is argument of sock.setsockopt.
    99. sslopt: dict
    100. Optional dict object for ssl socket option.
    101. ping_interval: int or float
    102. Automatically send "ping" command
    103. every specified period (in seconds).
    104. If set to 0, no ping is sent periodically.
    105. ping_timeout: int or float
    106. Timeout (in seconds) if the pong message is not received.
    107. ping_payload: str
    108. Payload message to send with each ping.
    109. http_proxy_host: str
    110. HTTP proxy host name.
    111. http_proxy_port: int or str
    112. HTTP proxy port. If not set, set to 80.
    113. http_no_proxy: list
    114. Whitelisted host names that don't use the proxy.
    115. skip_utf8_validation: bool
    116. skip utf8 validation.
    117. host: str
    118. update host header.
    119. origin: str
    120. update origin header.
    121. dispatcher: Dispatcher object
    122. customize reading data from socket.
    123. suppress_origin: bool
    124. suppress outputting origin header.
    125. Returns
    126. -------
    127. teardown: bool
    128. False if caught KeyboardInterrupt, True if other exception was raised during a loop
    129. """

    附上send的opcode类型

    1. # 导入方式
    2. import websocket
    3. websocket.ABNF.OPCODE_PONG
    4. # 类型
    5. OPCODE_CONT = 0x0
    6. OPCODE_TEXT = 0x1
    7. OPCODE_BINARY = 0x2
    8. OPCODE_CLOSE = 0x8
    9. OPCODE_PING = 0x9
    10. OPCODE_PONG = 0xa
    11. # available operation code value tuple
    12. OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,
    13. OPCODE_PING, OPCODE_PONG)
    14. # opcode human readable string
    15. OPCODE_MAP = {
    16. OPCODE_CONT: "cont",
    17. OPCODE_TEXT: "text",
    18. OPCODE_BINARY: "binary",
    19. OPCODE_CLOSE: "close",
    20. OPCODE_PING: "ping",
    21. OPCODE_PONG: "pong"
    22. }
    23. # data length threshold.
    24. LENGTH_7 = 0x7e
    25. LENGTH_16 = 1 << 16
    26. LENGTH_63 = 1 << 63

    监听示例

    1. def on_data(self, ws:websocket.WebSocketApp, message, data_type, flag):
    2. logger.debug("{}内容:{},类型:{},flage:{}".format(ws.title, message, data_type, flag))
    3. def on_close(self, ws:websocket.WebSocketApp, close_status_code, close_msg):
    4. logger.debug("{}关闭连接,状态码:{}, 关闭消息:{}.....".format(ws.title, close_status_code, close_msg))
    5. def on_error(self, ws:websocket.WebSocketApp, exception):
    6. if getattr(a, 'format_exc', None) is not None:
    7. logger.debug("异常():{}".format(ws.title, exception.format_exc()))
    8. else:
    9. logger.exception("异常({})".format(ws.title))
    10. def on_open(self, ws:websocket.WebSocketApp):
    11. logger.debug("websocket()连接成功.....".format(ws.title))
    12. ws.open_start_time = time.time()
    13. for topic in ws.topics:
    14. ws.send(json.dumps(topic))
    15. def on_pong(self, ws:websocket.WebSocketApp, message):
    16. logger.debug("{}收到pong消息:{}".format(ws.title, message))
    17. def on_ping(self, ws:websocket.WebSocketApp, message):
    18. ws.send(json.dumps({"pong": int(time.time() * 1000)}), websocket.ABNF.OPCODE_PONG)
    19. logger.debug("{}收到ping消息:{}".format(ws.title, message))
    20. def on_message(self, ws:websocket.WebSocketApp, message):
    21. logger.debug("{}收到消息:{}".format(ws.title, message))