# coding: utf8
import json
import base64
import hashlib
import requests
from typing import List, Optional, Union
from requests import Session
from airflow.exceptions import AirflowException
from airflow.configuration import conf
from airflow.providers.http.hooks.http import HttpHook
class WorkWeChatHook(HttpHook):
"""
This hook allows you send Dingding message using Dingding custom bot.
Get Dingding token from conn_id.password. And prefer set domain to
conn_id.host, if not will use default ``https://oapi.dingtalk.com``.
For more detail message in
`Dingding custom bot <https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq>`_
:param dingding_conn_id: The name of the Dingding connection to use
:type dingding_conn_id: str
:param message_type: Message type you want to send to Dingding, support five type so far
including text, link, markdown, actionCard, feedCard
:type message_type: str
:param message: The message send to Dingding chat group
:type message: str or dict
:param at_mobiles: Remind specific users with this message
:type at_mobiles: list[str]
:param at_all: Remind all people in group or not. If True, will overwrite ``at_mobiles``
:type at_all: bool
>>> from custom_hooks.workwechat import WorkWeChatHook
>>>
"""
conn_name_attr = 'workwechat_conn_id'
default_conn_name = 'workwechat_default'
conn_type = 'workwechat'
hook_name = 'WorkWeChat'
def __init__(
self,
workwechat_conn_id='workwechat_default',
message_type: str = 'text',
messages: Optional[List[str]] = None,
images: Optional[List[str]] = None,
files: Optional[List[str]] = None,
artitles: Optional[List[str]] = None,
at_mobiles: Optional[List[str]] = None,
at_all: bool = False,
*args,
**kwargs,
) -> None:
"""Initialization
Args:
dingding_conn_id (str, optional): [description]. Defaults to 'workwechat_default'.
message_type (str, optional): [description]. Defaults to 'text'.
message (Optional[Union[str, dict]], optional): [description]. Defaults to None.
at_mobiles (Optional[List[str]], optional): [description]. Defaults to None.
at_all (bool, optional): [description]. Defaults to False.
"""
super().__init__(http_conn_id=workwechat_conn_id, *args, **kwargs)
self.message_type = message_type
self.messages = messages
self.images = images
self.files = files
self.artitles = artitles
self.at_mobiles = at_mobiles
self.at_all = at_all
workwechat_key = conf.get('robots', 'workwechat_key')
self.web_hook = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={workwechat_key}"
self.upload_hook = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={workwechat_key}&type=file"
self.headers = {
"Content-Type": "application/json;text/plain;charset=UTF-8",
"Accept": "application/json;charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/79.0.3945.79 Safari/537.36 "
}
def _build_message(self):
preloads = []
if self.message_type in ["text", "markdown"]:
for msg in self.messages:
data = {
"msgtype": self.message_type,
self.message_type: {"content": msg}
}
if self.message_type == "text":
data["text"]["mentioned_mobile_list"] = self.at_mobiles if not self.at_all else ["@all"]
preloads.append(data)
elif self.message_type == "image":
for image in self.images:
with open(image, "rb") as img:
base64_ = str(base64.b64encode(img.read()), encoding='utf-8')
with open(image, "rb") as img:
md = hashlib.md5()
md.update(img.read())
md5_ = md.hexdigest()
data = {
"msgtype": "image",
"image": {
"base64": base64_,
"md5": md5_,
}
}
preloads.append(data)
elif self.message_type == "news":
data = {
"msgtype": "news",
"news": {
"articles": self.articles
}
}
preloads.append(data)
elif self.message_type == "file":
for file in self.files:
with open(file, "rb") as fp:
data = {"file": fp}
res = requests.post(self.upload_hook, files=data)
media_id = res.json()["media_id"]
data = {
"msgtype": "file",
"file": {
"media_id": media_id
}
}
preloads.append(data)
else:
raise AirflowException("`message_type` must in ['text', 'markdown', 'image', 'news', 'file']")
return preloads
def send(self):
preloads = self._build_message()
for preload in preloads:
res = requests.post(self.web_hook, json=preload, verify=False)
if json.loads(res.text)["errmsg"] == "ok":
self.log.info("successful")
else:
self.log.info.info("failed")