5.6.2.3 Reach的RPC协议客户端实现细节
Reach的RPC协议的设计简单到只需要使用支持HTTP和JSON交互的程序语言就能实现。本文档将会从细节上介绍使用Python语言实现的RPC客户端信息。在基于RPC的前端教程中展示了这个库的一个使用实例。整个库有80行代码。
该库使用一些标准的Python库来与JSON、HTTP服务器和网络进行交互。
1 # flake8: noqa
2
3 import json
4 import os
5 import requests
6 import socket
7 import time
8 import urllib3
9
.. # ...
.. # ...
11 def mk_rpc(opts={}):
12 def opt_of(field, envvar, default=None, f=lambda x: x):
13 opt = f(opts.get(field)) if opts.get(field) is not None \
14 else f(os.environ.get(envvar)) if os.environ.get(envvar) is not None \
15 else default
16
17 if opt is None:
18 raise RuntimeError('Mandatory configuration unset for: %s' % field)
19
20 return opt
21
22 host = opt_of('host', 'REACH_RPC_SERVER')
23 port = opt_of('port', 'REACH_RPC_PORT')
24 key = opt_of('key', 'REACH_RPC_KEY')
25 timeout = opt_of('timeout', 'REACH_RPC_TIMEOUT', f=int, default=5)
26 verify = opt_of('verify', 'REACH_RPC_TLS_REJECT_UNVERIFIED', f=lambda x: x != '0')
27
.. # ...
该库提供了一个单一的函数,mk_rpc
,能够接受Reach的RPC 客户端标准选项。
.. # ...
28 if not verify:
29 urllib3.disable_warnings()
30 print('\n*** Warning! TLS verification disabled! ***\n')
31 print(' This is highly insecure in Real Life™ applications and must')
32 print(' only be permitted under controlled conditions (such as')
33 print(' during development).\n')
34
.. # ...
这段程序首先检查验证选项,并告知用于进行HTTPS交互的Python库关闭的警告信息。随后,程序会向用户显示一个警告,使用户对使用这一设置感到紧张。
.. # ...
35 # From: https://gist.github.com/butla/2d9a4c0f35ea47b7452156c96a4e7b12
36 start_time = time.perf_counter()
37 while True:
38 try:
39 with socket.create_connection((host, port), timeout=timeout):
40 break
41 except OSError as ex:
42 time.sleep(0.01)
43 if time.perf_counter() - start_time >= timeout:
44 raise TimeoutError('Waited too long for the port {} '
45 'on host {} to accept connection.'
46 .format(port, host)) from ex
47
.. # ...
接下来,它试图连接到Reach RPC服务器,如果没有足够快的反应,就会抛出一个错误。
.. # ...
52 def rpc(m, *args):
53 lab = 'RPC %s %s' % (m, json.dumps([*args]))
54 debug(lab)
55 ans = requests.post('https://%s:%s%s' % (host, port, m),
56 json = [*args],
57 headers = {'X-API-Key': key},
58 verify = verify)
59 ans.raise_for_status()
60 debug('%s ==> %s' % (lab, json.dumps(ans.json())))
61 return ans.json()
62
.. # ...
它定义了一个以后会返回的函数函数rpc
,以实现同步值RPC方法)的协议。它格式化一个给定的请求,发布它,最终返回反序列化的结果。为了方便起见,程序还打印了调试信息。
.. # ...
63 def rpc_callbacks(m, arg, cbacks):
64 vals = {k: v for k, v in cbacks.items() if not callable(v)}
65 meths = {k: True for k, v in cbacks.items() if callable(v)}
66 p = rpc(m, arg, vals, meths)
67
68 while True:
69 if p['t'] == 'Done':
70 return p
71
72 elif p['t'] == 'Kont':
73 cback = cbacks[p['m']]
74 ans = cback(*p['args'])
75 p = rpc('/kont', p['kid'], ans)
76
77 else:
78 raise Exception('Illegal callback return: %s' % json.dumps(p))
79
.. # ...
它定义了一个以后会返回的函数rpc_callbacks
,实现了交互式RPC方法)的协议。在第64行和第65行,该函数检查了它的第三个参数cbacks
,并将callable
参数与值分开,并创建了中间对象vals
和meths
,以提供RPC的调用。在调用后,第68行开始的while
循环中,它会对结果进行检查以确定它是一个最终的调用结果还是一个交互式RPC回调),如果它是一个回调,正如第72行的代码所示,则提取方法的名称p['m']
,并在原来的第三个参数cbacks
中用提供的参数调用它。它会将p
值替换为该继续调用的结果并继续执行程序逻辑。
.. # ...
80 return rpc, rpc_callbacks
最终,将返回rpc
和rpc_callbacks
给用户。