需要在配置文件内定义
[vip.ws]
host = "ws.ebonex.vip"
agreement = "wss"
api = [['api', 'wss']]
api
{
"id": "quote",
"path": "/ws/quote/v1?lang=zh-cn",
"title": "行情"
}
case
class TestCase(TestBase):
def test_01(self, data_conversion, data):
topic, outs = data_conversion.get('topic', 'outs')
client = self.settings.ws.get_client("quote", *topic, on_pong=self.on_pong)
client.run_forever(ping_interval=2, ping_payload=json.dumps({"ping": int(time.time() * 1000)}))
def on_pong(self, ws: websocket.WebSocketApp, message):
self.logger.debug("收到pong消息:{}".format(message))
if time.time() - ws.open_start_time > 20:
ws.close()
data(topic)
[
{
"__file": "test_quote",
"__func": "test_01",
"__dependent_class": null
},
{
"title": "行情",
"tags": ["au", "debug"],
"topic": [
{"id":"BTCAUD","topic":"klineV2_5m","symbol":"BTCAUD","params":{"org":9001,"binary":false},"event":"sub"}
],
"outs": {
}
}
]
get_client入参
def get_client(self, api_id, *topics, header=None, **kwargs) -> websocket.WebSocketApp:
"""
获取客户端
api_id:
topics: topics列表
header
"""
websocket采用的是websocket.WebSocketApp
,相关方法及代码
class WebSocketApp:
"""
Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
"""
def __init__(self, url, header=None,
on_open=None, on_message=None, on_error=None,
on_close=None, on_ping=None, on_pong=None,
on_cont_message=None,
keep_running=True, get_mask_key=None, cookie=None,
subprotocols=None,
on_data=None):
"""
WebSocketApp initialization
Parameters
----------
url: str
Websocket url.
header: list or dict
Custom header for websocket handshake.
on_open: function
Callback object which is called at opening websocket.
on_open has one argument.
The 1st argument is this class object.
on_message: function
Callback object which is called when received data.
on_message has 2 arguments.
The 1st argument is this class object.
The 2nd argument is utf-8 data received from the server.
on_error: function
Callback object which is called when we get error.
on_error has 2 arguments.
The 1st argument is this class object.
The 2nd argument is exception object.
on_close: function
Callback object which is called when connection is closed.
on_close has 3 arguments.
The 1st argument is this class object.
The 2nd argument is close_status_code.
The 3rd argument is close_msg.
on_cont_message: function
Callback object which is called when a continuation
frame is received.
on_cont_message has 3 arguments.
The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server.
The 3rd argument is continue flag. if 0, the data continue
to next frame data
on_data: function
Callback object which is called when a message received.
This is called before on_message or on_cont_message,
and then on_message or on_cont_message is called.
on_data has 4 argument.
The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server.
The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
The 4th argument is continue flag. If 0, the data continue
keep_running: bool
This parameter is obsolete and ignored.
get_mask_key: function
A callable function to get new mask keys, see the
WebSocket.set_mask_key's docstring for more information.
cookie: str
Cookie value.
subprotocols: list
List of available sub protocols. Default is None.
"""
def send(self, data, opcode=ABNF.OPCODE_TEXT):
"""
send message
Parameters
----------
data: str
Message to send. If you set opcode to OPCODE_TEXT,
data must be utf-8 string or unicode.
opcode: int
Operation code of data. Default is OPCODE_TEXT.
"""
def close(self, **kwargs):
"""
Close websocket connection.
"""
def run_forever(self, sockopt=None, sslopt=None,
ping_interval=0, ping_timeout=None,
ping_payload="",
http_proxy_host=None, http_proxy_port=None,
http_no_proxy=None, http_proxy_auth=None,
skip_utf8_validation=False,
host=None, origin=None, dispatcher=None,
suppress_origin=False, proxy_type=None):
"""
Run event loop for WebSocket framework.
This loop is an infinite loop and is alive while websocket is available.
Parameters
----------
sockopt: tuple
Values for socket.setsockopt.
sockopt must be tuple
and each element is argument of sock.setsockopt.
sslopt: dict
Optional dict object for ssl socket option.
ping_interval: int or float
Automatically send "ping" command
every specified period (in seconds).
If set to 0, no ping is sent periodically.
ping_timeout: int or float
Timeout (in seconds) if the pong message is not received.
ping_payload: str
Payload message to send with each ping.
http_proxy_host: str
HTTP proxy host name.
http_proxy_port: int or str
HTTP proxy port. If not set, set to 80.
http_no_proxy: list
Whitelisted host names that don't use the proxy.
skip_utf8_validation: bool
skip utf8 validation.
host: str
update host header.
origin: str
update origin header.
dispatcher: Dispatcher object
customize reading data from socket.
suppress_origin: bool
suppress outputting origin header.
Returns
-------
teardown: bool
False if caught KeyboardInterrupt, True if other exception was raised during a loop
"""
附上send的opcode类型
# 导入方式
import websocket
websocket.ABNF.OPCODE_PONG
# 类型
OPCODE_CONT = 0x0
OPCODE_TEXT = 0x1
OPCODE_BINARY = 0x2
OPCODE_CLOSE = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xa
# available operation code value tuple
OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,
OPCODE_PING, OPCODE_PONG)
# opcode human readable string
OPCODE_MAP = {
OPCODE_CONT: "cont",
OPCODE_TEXT: "text",
OPCODE_BINARY: "binary",
OPCODE_CLOSE: "close",
OPCODE_PING: "ping",
OPCODE_PONG: "pong"
}
# data length threshold.
LENGTH_7 = 0x7e
LENGTH_16 = 1 << 16
LENGTH_63 = 1 << 63
监听示例
def on_data(self, ws:websocket.WebSocketApp, message, data_type, flag):
logger.debug("{}内容:{},类型:{},flage:{}".format(ws.title, message, data_type, flag))
def on_close(self, ws:websocket.WebSocketApp, close_status_code, close_msg):
logger.debug("{}关闭连接,状态码:{}, 关闭消息:{}.....".format(ws.title, close_status_code, close_msg))
def on_error(self, ws:websocket.WebSocketApp, exception):
if getattr(a, 'format_exc', None) is not None:
logger.debug("异常():{}".format(ws.title, exception.format_exc()))
else:
logger.exception("异常({})".format(ws.title))
def on_open(self, ws:websocket.WebSocketApp):
logger.debug("websocket()连接成功.....".format(ws.title))
ws.open_start_time = time.time()
for topic in ws.topics:
ws.send(json.dumps(topic))
def on_pong(self, ws:websocket.WebSocketApp, message):
logger.debug("{}收到pong消息:{}".format(ws.title, message))
def on_ping(self, ws:websocket.WebSocketApp, message):
ws.send(json.dumps({"pong": int(time.time() * 1000)}), websocket.ABNF.OPCODE_PONG)
logger.debug("{}收到ping消息:{}".format(ws.title, message))
def on_message(self, ws:websocket.WebSocketApp, message):
logger.debug("{}收到消息:{}".format(ws.title, message))