1. from __future__ import absolute_import, unicode_literals
    2. import celery
    3. from celery import task
    4. import json
    5. import time
    6. import logging
    7. import traceback
    8. import requests
    9. from utils.ansible_api import AnsibleRunner
    10. logger = logging.getLogger('django')
    11. def initialize_callback(logger, callback_url, container_id, status):
    12. '''容器初始化状态回调
    13. status:
    14. 1: 新增成功, 2: 新增失败, 3: 删除成功, 4:删除失败
    15. '''
    16. with requests.Session() as s:
    17. url = f"{callback_url}/{container_id}/{status}"
    18. logger.info(url)
    19. a = requests.adapters.HTTPAdapter(max_retries=60)
    20. s.mount('http://', a)
    21. _retries = 3
    22. while _retries > 0:
    23. r = s.get(url)
    24. if r.status_code == 200:
    25. logger.info(f"notify success - {container_id}/{status} - {str(r.content)}")
    26. break
    27. _retries -= 1
    28. time.sleep(10)
    29. else:
    30. logger.error(f"notify failed - {container_id}/{status} - {str(r.content)}")
    31. class InitializeCallbackTask(celery.Task):
    32. '''容器初始化回调'''
    33. def __init__(self):
    34. super(InitializeCallbackTask, self).__init__()
    35. def on_success(self, retval, task_id, args, kwargs):
    36. need_callback = kwargs.get('need_callback')
    37. do_action = kwargs['DOACTION']
    38. if need_callback:
    39. try:
    40. callback_url = kwargs.get('callback_url')
    41. container_id = kwargs.get('container_id')
    42. status = 1 if do_action == 'INITCON' else 3
    43. # user_name = kwargs.get('user_name')
    44. initialize_callback(logger, callback_url, container_id, status)
    45. except Exception:
    46. logger.error("initialize success, notify failed.\n" + traceback.format_exc())
    47. def on_failure(self, exc, task_id, args, kwargs, einfo):
    48. need_callback = kwargs.get('need_callback')
    49. logger.error(('Task {0} raised exception: {1} {2}'.format(task_id, exc, einfo.traceback)))
    50. if need_callback:
    51. try:
    52. callback_url = kwargs.get('callback_url')
    53. container_id = kwargs.get('container_id')
    54. do_action = kwargs['DOACTION']
    55. status = 2 if do_action == 'INITCON' else 4
    56. # user_name = kwargs.get('user_name')
    57. initialize_callback(logger, callback_url, container_id, status)
    58. except Exception:
    59. logger.error("initialize failed, notify failed.\n" + traceback.format_exc())
    60. @task(base=InitializeCallbackTask)
    61. def initialize_task_v1(ansible_hosts, playbook, need_callback=False, callback_url=None, **kwargs):
    62. '''容器初始化任务'''
    63. logger.info(f"ansible-playbook: {playbook} - {json.dumps(kwargs)}")
    64. def do_action(_resource, playbook, kwargs):
    65. rbt = AnsibleRunner(_resource)
    66. rbt.run_playbook(playbook, kwargs)
    67. _res = rbt.get_playbook_result()
    68. failed_msg = _res.get('failed')
    69. skip_msg = _res.get('skipped')
    70. ok_msg = _res.get('ok')
    71. status_msg = _res.get('status')
    72. unreach_msg = _res.get('unreachable')
    73. changed_msg = _res.get('changed')
    74. if failed_msg:
    75. raise Exception(failed_msg)
    76. if unreach_msg:
    77. raise Exception(unreach_msg)
    78. if not failed_msg and not skip_msg and not ok_msg and not status_msg and not unreach_msg and not changed_msg:
    79. raise Exception({'msg': 'ansible-playbook task was not executed'})
    80. return _res
    81. if isinstance(ansible_hosts, dict):
    82. return do_action(ansible_hosts, playbook, kwargs)
    83. elif isinstance(ansible_hosts, list):
    84. _result = []
    85. for _resource in ansible_hosts:
    86. for k, _ in _resource.items():
    87. kwargs.update({"host": k})
    88. _result.append(do_action(_resource, playbook, kwargs))
    89. return _result
    90. else:
    91. raise Exception('wrong inventory struct')