# coding: utf8import jsonimport base64import hashlibimport requestsfrom typing import List, Optional, Unionfrom requests import Sessionfrom airflow.exceptions import AirflowExceptionfrom airflow.configuration import conffrom airflow.providers.http.hooks.http import HttpHookclass WorkWeChatHook(HttpHook): """ This hook allows you send Dingding message using Dingding custom bot. Get Dingding token from conn_id.password. And prefer set domain to conn_id.host, if not will use default ``https://oapi.dingtalk.com``. For more detail message in `Dingding custom bot <https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq>`_ :param dingding_conn_id: The name of the Dingding connection to use :type dingding_conn_id: str :param message_type: Message type you want to send to Dingding, support five type so far including text, link, markdown, actionCard, feedCard :type message_type: str :param message: The message send to Dingding chat group :type message: str or dict :param at_mobiles: Remind specific users with this message :type at_mobiles: list[str] :param at_all: Remind all people in group or not. If True, will overwrite ``at_mobiles`` :type at_all: bool >>> from custom_hooks.workwechat import WorkWeChatHook >>> """ conn_name_attr = 'workwechat_conn_id' default_conn_name = 'workwechat_default' conn_type = 'workwechat' hook_name = 'WorkWeChat' def __init__( self, workwechat_conn_id='workwechat_default', message_type: str = 'text', messages: Optional[List[str]] = None, images: Optional[List[str]] = None, files: Optional[List[str]] = None, artitles: Optional[List[str]] = None, at_mobiles: Optional[List[str]] = None, at_all: bool = False, *args, **kwargs, ) -> None: """Initialization Args: dingding_conn_id (str, optional): [description]. Defaults to 'workwechat_default'. message_type (str, optional): [description]. Defaults to 'text'. message (Optional[Union[str, dict]], optional): [description]. Defaults to None. at_mobiles (Optional[List[str]], optional): [description]. Defaults to None. at_all (bool, optional): [description]. Defaults to False. """ super().__init__(http_conn_id=workwechat_conn_id, *args, **kwargs) self.message_type = message_type self.messages = messages self.images = images self.files = files self.artitles = artitles self.at_mobiles = at_mobiles self.at_all = at_all workwechat_key = conf.get('robots', 'workwechat_key') self.web_hook = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={workwechat_key}" self.upload_hook = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={workwechat_key}&type=file" self.headers = { "Content-Type": "application/json;text/plain;charset=UTF-8", "Accept": "application/json;charset=UTF-8", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/79.0.3945.79 Safari/537.36 " } def _build_message(self): preloads = [] if self.message_type in ["text", "markdown"]: for msg in self.messages: data = { "msgtype": self.message_type, self.message_type: {"content": msg} } if self.message_type == "text": data["text"]["mentioned_mobile_list"] = self.at_mobiles if not self.at_all else ["@all"] preloads.append(data) elif self.message_type == "image": for image in self.images: with open(image, "rb") as img: base64_ = str(base64.b64encode(img.read()), encoding='utf-8') with open(image, "rb") as img: md = hashlib.md5() md.update(img.read()) md5_ = md.hexdigest() data = { "msgtype": "image", "image": { "base64": base64_, "md5": md5_, } } preloads.append(data) elif self.message_type == "news": data = { "msgtype": "news", "news": { "articles": self.articles } } preloads.append(data) elif self.message_type == "file": for file in self.files: with open(file, "rb") as fp: data = {"file": fp} res = requests.post(self.upload_hook, files=data) media_id = res.json()["media_id"] data = { "msgtype": "file", "file": { "media_id": media_id } } preloads.append(data) else: raise AirflowException("`message_type` must in ['text', 'markdown', 'image', 'news', 'file']") return preloads def send(self): preloads = self._build_message() for preload in preloads: res = requests.post(self.web_hook, json=preload, verify=False) if json.loads(res.text)["errmsg"] == "ok": self.log.info("successful") else: self.log.info.info("failed")