1.使用request解决
import requests
headers = {
"contentType": "application/json"
}
def register(name, id, address, port):
url = "http://192.168.119.128:8500/v1/agent/service/register"
rsp = requests.put(url, headers=headers, json={
"Name": name,
"ID": id,
"Tags": ["mxshop", "bobby", "imooc", "web"],
"Address": address,
"Port": port,
})
if rsp.status_code == 200:
print("注册成功")
else:
print(f"注册失败: {rsp.status_code}")
def deregistry(id):
url = f"http://192.168.119.128:8500/v1/agent/service/deregister/{id}"
rsp = requests.put(url, headers=headers)
if rsp.status_code == 200:
print("注销成功")
else:
print(f"注销失败: {rsp.status_code}")
if __name__ == "__main__":
# register("mxshop-web", "mxshop-web", "127.0.0.1", 50051)
deregistry("mxshop-web")
2.健康检查的服务注册
import requests
headers = {
"contentType": "application/json"
}
def register(name, id, address, port):
url = "http://192.168.119.128:8500/v1/agent/service/register"
rsp = requests.put(url, headers=headers, json={
"Name": name,
"ID": id,
"Tags": ["mxshop", "bobby", "imooc", "web"],
"Address": address,
"Port": port,
"Check": {
"HTTP": f"http://{address}:{port}/health",
"Timeout": "5s",
"Interval": "5s",
"DeregisterCriticalServiceAfter": "15s",
}
})
if rsp.status_code == 200:
print("注册成功")
else:
print(f"注册失败: {rsp.status_code}")
def deregistry(id):
url = f"http://192.168.119.128:8500/v1/agent/service/deregister/{id}"
rsp = requests.put(url, headers=headers)
if rsp.status_code == 200:
print("注销成功")
else:
print(f"注销失败: {rsp.status_code}")
if __name__ == "__main__":
register("mxshop-web", "mxshop-web", "192.168.245.1", 8021)
# deregistry("mxshop-web")
3.过滤服务
def filter_service(name):
url = "http://192.168.119.128:8500/v1/agent/services"
params = {
"filter": f'Service == "{name}"'
}
rsp = requests.get(url, params=params).json()
for key, value in rsp.items():
print(key)
4.使用第三方库
安装
pip install python-consul2
import consul
import json
c = consul.Consul(host="192.168.119.128")
address = "192.168.119.129"
port = 50051
check = {
"GRPC": f"{address}:{port}",
"GRPCUseTLS": False, # 不需要做安全验证
"Timeout": "5s",
"Interval": "5s",
"DeregisterCriticalServiceAfter": "15s",
}
rsp = c.agent.service.register(name="user-srv", service_id="user-srv", address=address, port=port,
tags=["mxshop"], check=check)
rsp = c.agent.services()
json.loads(rsp)
rsp = c.agent.service.deregister("user-srv")
print(rsp)