1.使用request解决
import requestsheaders = {"contentType": "application/json"}def register(name, id, address, port):url = "http://192.168.119.128:8500/v1/agent/service/register"rsp = requests.put(url, headers=headers, json={"Name": name,"ID": id,"Tags": ["mxshop", "bobby", "imooc", "web"],"Address": address,"Port": port,})if rsp.status_code == 200:print("注册成功")else:print(f"注册失败: {rsp.status_code}")def deregistry(id):url = f"http://192.168.119.128:8500/v1/agent/service/deregister/{id}"rsp = requests.put(url, headers=headers)if rsp.status_code == 200:print("注销成功")else:print(f"注销失败: {rsp.status_code}")if __name__ == "__main__":# register("mxshop-web", "mxshop-web", "127.0.0.1", 50051)deregistry("mxshop-web")
2.健康检查的服务注册
import requestsheaders = {"contentType": "application/json"}def register(name, id, address, port):url = "http://192.168.119.128:8500/v1/agent/service/register"rsp = requests.put(url, headers=headers, json={"Name": name,"ID": id,"Tags": ["mxshop", "bobby", "imooc", "web"],"Address": address,"Port": port,"Check": {"HTTP": f"http://{address}:{port}/health","Timeout": "5s","Interval": "5s","DeregisterCriticalServiceAfter": "15s",}})if rsp.status_code == 200:print("注册成功")else:print(f"注册失败: {rsp.status_code}")def deregistry(id):url = f"http://192.168.119.128:8500/v1/agent/service/deregister/{id}"rsp = requests.put(url, headers=headers)if rsp.status_code == 200:print("注销成功")else:print(f"注销失败: {rsp.status_code}")if __name__ == "__main__":register("mxshop-web", "mxshop-web", "192.168.245.1", 8021)# deregistry("mxshop-web")
3.过滤服务
def filter_service(name):url = "http://192.168.119.128:8500/v1/agent/services"params = {"filter": f'Service == "{name}"'}rsp = requests.get(url, params=params).json()for key, value in rsp.items():print(key)
4.使用第三方库
安装
pip install python-consul2
import consulimport jsonc = consul.Consul(host="192.168.119.128")address = "192.168.119.129"port = 50051check = {"GRPC": f"{address}:{port}","GRPCUseTLS": False, # 不需要做安全验证"Timeout": "5s","Interval": "5s","DeregisterCriticalServiceAfter": "15s",}rsp = c.agent.service.register(name="user-srv", service_id="user-srv", address=address, port=port,tags=["mxshop"], check=check)rsp = c.agent.services()json.loads(rsp)rsp = c.agent.service.deregister("user-srv")print(rsp)
