1.使用request解决

  1. import requests
  2. headers = {
  3. "contentType": "application/json"
  4. }
  5. def register(name, id, address, port):
  6. url = "http://192.168.119.128:8500/v1/agent/service/register"
  7. rsp = requests.put(url, headers=headers, json={
  8. "Name": name,
  9. "ID": id,
  10. "Tags": ["mxshop", "bobby", "imooc", "web"],
  11. "Address": address,
  12. "Port": port,
  13. })
  14. if rsp.status_code == 200:
  15. print("注册成功")
  16. else:
  17. print(f"注册失败: {rsp.status_code}")
  18. def deregistry(id):
  19. url = f"http://192.168.119.128:8500/v1/agent/service/deregister/{id}"
  20. rsp = requests.put(url, headers=headers)
  21. if rsp.status_code == 200:
  22. print("注销成功")
  23. else:
  24. print(f"注销失败: {rsp.status_code}")
  25. if __name__ == "__main__":
  26. # register("mxshop-web", "mxshop-web", "127.0.0.1", 50051)
  27. deregistry("mxshop-web")

2.健康检查的服务注册

  1. import requests
  2. headers = {
  3. "contentType": "application/json"
  4. }
  5. def register(name, id, address, port):
  6. url = "http://192.168.119.128:8500/v1/agent/service/register"
  7. rsp = requests.put(url, headers=headers, json={
  8. "Name": name,
  9. "ID": id,
  10. "Tags": ["mxshop", "bobby", "imooc", "web"],
  11. "Address": address,
  12. "Port": port,
  13. "Check": {
  14. "HTTP": f"http://{address}:{port}/health",
  15. "Timeout": "5s",
  16. "Interval": "5s",
  17. "DeregisterCriticalServiceAfter": "15s",
  18. }
  19. })
  20. if rsp.status_code == 200:
  21. print("注册成功")
  22. else:
  23. print(f"注册失败: {rsp.status_code}")
  24. def deregistry(id):
  25. url = f"http://192.168.119.128:8500/v1/agent/service/deregister/{id}"
  26. rsp = requests.put(url, headers=headers)
  27. if rsp.status_code == 200:
  28. print("注销成功")
  29. else:
  30. print(f"注销失败: {rsp.status_code}")
  31. if __name__ == "__main__":
  32. register("mxshop-web", "mxshop-web", "192.168.245.1", 8021)
  33. # deregistry("mxshop-web")

3.过滤服务

  1. def filter_service(name):
  2. url = "http://192.168.119.128:8500/v1/agent/services"
  3. params = {
  4. "filter": f'Service == "{name}"'
  5. }
  6. rsp = requests.get(url, params=params).json()
  7. for key, value in rsp.items():
  8. print(key)

4.使用第三方库

安装

  1. pip install python-consul2
  1. import consul
  2. import json
  3. c = consul.Consul(host="192.168.119.128")
  4. address = "192.168.119.129"
  5. port = 50051
  6. check = {
  7. "GRPC": f"{address}:{port}",
  8. "GRPCUseTLS": False, # 不需要做安全验证
  9. "Timeout": "5s",
  10. "Interval": "5s",
  11. "DeregisterCriticalServiceAfter": "15s",
  12. }
  13. rsp = c.agent.service.register(name="user-srv", service_id="user-srv", address=address, port=port,
  14. tags=["mxshop"], check=check)
  15. rsp = c.agent.services()
  16. json.loads(rsp)
  17. rsp = c.agent.service.deregister("user-srv")
  18. print(rsp)