from __future__ import absolute_import, unicode_literals
import celery
from celery import task
import json
import time
import logging
import traceback
import requests
from utils.ansible_api import AnsibleRunner
logger = logging.getLogger('django')
def initialize_callback(logger, callback_url, container_id, status):
'''容器初始化状态回调
status:
1: 新增成功, 2: 新增失败, 3: 删除成功, 4:删除失败
'''
with requests.Session() as s:
url = f"{callback_url}/{container_id}/{status}"
logger.info(url)
a = requests.adapters.HTTPAdapter(max_retries=60)
s.mount('http://', a)
_retries = 3
while _retries > 0:
r = s.get(url)
if r.status_code == 200:
logger.info(f"notify success - {container_id}/{status} - {str(r.content)}")
break
_retries -= 1
time.sleep(10)
else:
logger.error(f"notify failed - {container_id}/{status} - {str(r.content)}")
class InitializeCallbackTask(celery.Task):
'''容器初始化回调'''
def __init__(self):
super(InitializeCallbackTask, self).__init__()
def on_success(self, retval, task_id, args, kwargs):
need_callback = kwargs.get('need_callback')
do_action = kwargs['DOACTION']
if need_callback:
try:
callback_url = kwargs.get('callback_url')
container_id = kwargs.get('container_id')
status = 1 if do_action == 'INITCON' else 3
# user_name = kwargs.get('user_name')
initialize_callback(logger, callback_url, container_id, status)
except Exception:
logger.error("initialize success, notify failed.\n" + traceback.format_exc())
def on_failure(self, exc, task_id, args, kwargs, einfo):
need_callback = kwargs.get('need_callback')
logger.error(('Task {0} raised exception: {1} {2}'.format(task_id, exc, einfo.traceback)))
if need_callback:
try:
callback_url = kwargs.get('callback_url')
container_id = kwargs.get('container_id')
do_action = kwargs['DOACTION']
status = 2 if do_action == 'INITCON' else 4
# user_name = kwargs.get('user_name')
initialize_callback(logger, callback_url, container_id, status)
except Exception:
logger.error("initialize failed, notify failed.\n" + traceback.format_exc())
@task(base=InitializeCallbackTask)
def initialize_task_v1(ansible_hosts, playbook, need_callback=False, callback_url=None, **kwargs):
'''容器初始化任务'''
logger.info(f"ansible-playbook: {playbook} - {json.dumps(kwargs)}")
def do_action(_resource, playbook, kwargs):
rbt = AnsibleRunner(_resource)
rbt.run_playbook(playbook, kwargs)
_res = rbt.get_playbook_result()
failed_msg = _res.get('failed')
skip_msg = _res.get('skipped')
ok_msg = _res.get('ok')
status_msg = _res.get('status')
unreach_msg = _res.get('unreachable')
changed_msg = _res.get('changed')
if failed_msg:
raise Exception(failed_msg)
if unreach_msg:
raise Exception(unreach_msg)
if not failed_msg and not skip_msg and not ok_msg and not status_msg and not unreach_msg and not changed_msg:
raise Exception({'msg': 'ansible-playbook task was not executed'})
return _res
if isinstance(ansible_hosts, dict):
return do_action(ansible_hosts, playbook, kwargs)
elif isinstance(ansible_hosts, list):
_result = []
for _resource in ansible_hosts:
for k, _ in _resource.items():
kwargs.update({"host": k})
_result.append(do_action(_resource, playbook, kwargs))
return _result
else:
raise Exception('wrong inventory struct')