from __future__ import absolute_import, unicode_literalsimport celeryfrom celery import taskimport jsonimport timeimport loggingimport tracebackimport requestsfrom utils.ansible_api import AnsibleRunnerlogger = logging.getLogger('django')def initialize_callback(logger, callback_url, container_id, status): '''容器初始化状态回调 status: 1: 新增成功, 2: 新增失败, 3: 删除成功, 4:删除失败 ''' with requests.Session() as s: url = f"{callback_url}/{container_id}/{status}" logger.info(url) a = requests.adapters.HTTPAdapter(max_retries=60) s.mount('http://', a) _retries = 3 while _retries > 0: r = s.get(url) if r.status_code == 200: logger.info(f"notify success - {container_id}/{status} - {str(r.content)}") break _retries -= 1 time.sleep(10) else: logger.error(f"notify failed - {container_id}/{status} - {str(r.content)}")class InitializeCallbackTask(celery.Task): '''容器初始化回调''' def __init__(self): super(InitializeCallbackTask, self).__init__() def on_success(self, retval, task_id, args, kwargs): need_callback = kwargs.get('need_callback') do_action = kwargs['DOACTION'] if need_callback: try: callback_url = kwargs.get('callback_url') container_id = kwargs.get('container_id') status = 1 if do_action == 'INITCON' else 3 # user_name = kwargs.get('user_name') initialize_callback(logger, callback_url, container_id, status) except Exception: logger.error("initialize success, notify failed.\n" + traceback.format_exc()) def on_failure(self, exc, task_id, args, kwargs, einfo): need_callback = kwargs.get('need_callback') logger.error(('Task {0} raised exception: {1} {2}'.format(task_id, exc, einfo.traceback))) if need_callback: try: callback_url = kwargs.get('callback_url') container_id = kwargs.get('container_id') do_action = kwargs['DOACTION'] status = 2 if do_action == 'INITCON' else 4 # user_name = kwargs.get('user_name') initialize_callback(logger, callback_url, container_id, status) except Exception: logger.error("initialize failed, notify failed.\n" + traceback.format_exc())@task(base=InitializeCallbackTask)def initialize_task_v1(ansible_hosts, playbook, need_callback=False, callback_url=None, **kwargs): '''容器初始化任务''' logger.info(f"ansible-playbook: {playbook} - {json.dumps(kwargs)}") def do_action(_resource, playbook, kwargs): rbt = AnsibleRunner(_resource) rbt.run_playbook(playbook, kwargs) _res = rbt.get_playbook_result() failed_msg = _res.get('failed') skip_msg = _res.get('skipped') ok_msg = _res.get('ok') status_msg = _res.get('status') unreach_msg = _res.get('unreachable') changed_msg = _res.get('changed') if failed_msg: raise Exception(failed_msg) if unreach_msg: raise Exception(unreach_msg) if not failed_msg and not skip_msg and not ok_msg and not status_msg and not unreach_msg and not changed_msg: raise Exception({'msg': 'ansible-playbook task was not executed'}) return _res if isinstance(ansible_hosts, dict): return do_action(ansible_hosts, playbook, kwargs) elif isinstance(ansible_hosts, list): _result = [] for _resource in ansible_hosts: for k, _ in _resource.items(): kwargs.update({"host": k}) _result.append(do_action(_resource, playbook, kwargs)) return _result else: raise Exception('wrong inventory struct')