需要在配置文件内定义
[vip.ws]host = "ws.ebonex.vip"agreement = "wss"api = [['api', 'wss']]
api
{"id": "quote","path": "/ws/quote/v1?lang=zh-cn","title": "行情"}
case
class TestCase(TestBase):def test_01(self, data_conversion, data):topic, outs = data_conversion.get('topic', 'outs')client = self.settings.ws.get_client("quote", *topic, on_pong=self.on_pong)client.run_forever(ping_interval=2, ping_payload=json.dumps({"ping": int(time.time() * 1000)}))def on_pong(self, ws: websocket.WebSocketApp, message):self.logger.debug("收到pong消息:{}".format(message))if time.time() - ws.open_start_time > 20:ws.close()
data(topic)
[{"__file": "test_quote","__func": "test_01","__dependent_class": null},{"title": "行情","tags": ["au", "debug"],"topic": [{"id":"BTCAUD","topic":"klineV2_5m","symbol":"BTCAUD","params":{"org":9001,"binary":false},"event":"sub"}],"outs": {}}]
get_client入参
def get_client(self, api_id, *topics, header=None, **kwargs) -> websocket.WebSocketApp:"""获取客户端api_id:topics: topics列表header"""
websocket采用的是websocket.WebSocketApp,相关方法及代码
class WebSocketApp:"""Higher level of APIs are provided. The interface is like JavaScript WebSocket object."""def __init__(self, url, header=None,on_open=None, on_message=None, on_error=None,on_close=None, on_ping=None, on_pong=None,on_cont_message=None,keep_running=True, get_mask_key=None, cookie=None,subprotocols=None,on_data=None):"""WebSocketApp initializationParameters----------url: strWebsocket url.header: list or dictCustom header for websocket handshake.on_open: functionCallback object which is called at opening websocket.on_open has one argument.The 1st argument is this class object.on_message: functionCallback object which is called when received data.on_message has 2 arguments.The 1st argument is this class object.The 2nd argument is utf-8 data received from the server.on_error: functionCallback object which is called when we get error.on_error has 2 arguments.The 1st argument is this class object.The 2nd argument is exception object.on_close: functionCallback object which is called when connection is closed.on_close has 3 arguments.The 1st argument is this class object.The 2nd argument is close_status_code.The 3rd argument is close_msg.on_cont_message: functionCallback object which is called when a continuationframe is received.on_cont_message has 3 arguments.The 1st argument is this class object.The 2nd argument is utf-8 string which we get from the server.The 3rd argument is continue flag. if 0, the data continueto next frame dataon_data: functionCallback object which is called when a message received.This is called before on_message or on_cont_message,and then on_message or on_cont_message is called.on_data has 4 argument.The 1st argument is this class object.The 2nd argument is utf-8 string which we get from the server.The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.The 4th argument is continue flag. If 0, the data continuekeep_running: boolThis parameter is obsolete and ignored.get_mask_key: functionA callable function to get new mask keys, see theWebSocket.set_mask_key's docstring for more information.cookie: strCookie value.subprotocols: listList of available sub protocols. Default is None."""def send(self, data, opcode=ABNF.OPCODE_TEXT):"""send messageParameters----------data: strMessage to send. If you set opcode to OPCODE_TEXT,data must be utf-8 string or unicode.opcode: intOperation code of data. Default is OPCODE_TEXT."""def close(self, **kwargs):"""Close websocket connection."""def run_forever(self, sockopt=None, sslopt=None,ping_interval=0, ping_timeout=None,ping_payload="",http_proxy_host=None, http_proxy_port=None,http_no_proxy=None, http_proxy_auth=None,skip_utf8_validation=False,host=None, origin=None, dispatcher=None,suppress_origin=False, proxy_type=None):"""Run event loop for WebSocket framework.This loop is an infinite loop and is alive while websocket is available.Parameters----------sockopt: tupleValues for socket.setsockopt.sockopt must be tupleand each element is argument of sock.setsockopt.sslopt: dictOptional dict object for ssl socket option.ping_interval: int or floatAutomatically send "ping" commandevery specified period (in seconds).If set to 0, no ping is sent periodically.ping_timeout: int or floatTimeout (in seconds) if the pong message is not received.ping_payload: strPayload message to send with each ping.http_proxy_host: strHTTP proxy host name.http_proxy_port: int or strHTTP proxy port. If not set, set to 80.http_no_proxy: listWhitelisted host names that don't use the proxy.skip_utf8_validation: boolskip utf8 validation.host: strupdate host header.origin: strupdate origin header.dispatcher: Dispatcher objectcustomize reading data from socket.suppress_origin: boolsuppress outputting origin header.Returns-------teardown: boolFalse if caught KeyboardInterrupt, True if other exception was raised during a loop"""
附上send的opcode类型
# 导入方式import websocketwebsocket.ABNF.OPCODE_PONG# 类型OPCODE_CONT = 0x0OPCODE_TEXT = 0x1OPCODE_BINARY = 0x2OPCODE_CLOSE = 0x8OPCODE_PING = 0x9OPCODE_PONG = 0xa# available operation code value tupleOPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,OPCODE_PING, OPCODE_PONG)# opcode human readable stringOPCODE_MAP = {OPCODE_CONT: "cont",OPCODE_TEXT: "text",OPCODE_BINARY: "binary",OPCODE_CLOSE: "close",OPCODE_PING: "ping",OPCODE_PONG: "pong"}# data length threshold.LENGTH_7 = 0x7eLENGTH_16 = 1 << 16LENGTH_63 = 1 << 63
监听示例
def on_data(self, ws:websocket.WebSocketApp, message, data_type, flag):logger.debug("{}内容:{},类型:{},flage:{}".format(ws.title, message, data_type, flag))def on_close(self, ws:websocket.WebSocketApp, close_status_code, close_msg):logger.debug("{}关闭连接,状态码:{}, 关闭消息:{}.....".format(ws.title, close_status_code, close_msg))def on_error(self, ws:websocket.WebSocketApp, exception):if getattr(a, 'format_exc', None) is not None:logger.debug("异常():{}".format(ws.title, exception.format_exc()))else:logger.exception("异常({})".format(ws.title))def on_open(self, ws:websocket.WebSocketApp):logger.debug("websocket()连接成功.....".format(ws.title))ws.open_start_time = time.time()for topic in ws.topics:ws.send(json.dumps(topic))def on_pong(self, ws:websocket.WebSocketApp, message):logger.debug("{}收到pong消息:{}".format(ws.title, message))def on_ping(self, ws:websocket.WebSocketApp, message):ws.send(json.dumps({"pong": int(time.time() * 1000)}), websocket.ABNF.OPCODE_PONG)logger.debug("{}收到ping消息:{}".format(ws.title, message))def on_message(self, ws:websocket.WebSocketApp, message):logger.debug("{}收到消息:{}".format(ws.title, message))
