1. # coding: utf8
    2. import json
    3. import base64
    4. import hashlib
    5. import requests
    6. from typing import List, Optional, Union
    7. from requests import Session
    8. from airflow.exceptions import AirflowException
    9. from airflow.configuration import conf
    10. from airflow.providers.http.hooks.http import HttpHook
    11. class WorkWeChatHook(HttpHook):
    12. """
    13. This hook allows you send Dingding message using Dingding custom bot.
    14. Get Dingding token from conn_id.password. And prefer set domain to
    15. conn_id.host, if not will use default ``https://oapi.dingtalk.com``.
    16. For more detail message in
    17. `Dingding custom bot <https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq>`_
    18. :param dingding_conn_id: The name of the Dingding connection to use
    19. :type dingding_conn_id: str
    20. :param message_type: Message type you want to send to Dingding, support five type so far
    21. including text, link, markdown, actionCard, feedCard
    22. :type message_type: str
    23. :param message: The message send to Dingding chat group
    24. :type message: str or dict
    25. :param at_mobiles: Remind specific users with this message
    26. :type at_mobiles: list[str]
    27. :param at_all: Remind all people in group or not. If True, will overwrite ``at_mobiles``
    28. :type at_all: bool
    29. >>> from custom_hooks.workwechat import WorkWeChatHook
    30. >>>
    31. """
    32. conn_name_attr = 'workwechat_conn_id'
    33. default_conn_name = 'workwechat_default'
    34. conn_type = 'workwechat'
    35. hook_name = 'WorkWeChat'
    36. def __init__(
    37. self,
    38. workwechat_conn_id='workwechat_default',
    39. message_type: str = 'text',
    40. messages: Optional[List[str]] = None,
    41. images: Optional[List[str]] = None,
    42. files: Optional[List[str]] = None,
    43. artitles: Optional[List[str]] = None,
    44. at_mobiles: Optional[List[str]] = None,
    45. at_all: bool = False,
    46. *args,
    47. **kwargs,
    48. ) -> None:
    49. """Initialization
    50. Args:
    51. dingding_conn_id (str, optional): [description]. Defaults to 'workwechat_default'.
    52. message_type (str, optional): [description]. Defaults to 'text'.
    53. message (Optional[Union[str, dict]], optional): [description]. Defaults to None.
    54. at_mobiles (Optional[List[str]], optional): [description]. Defaults to None.
    55. at_all (bool, optional): [description]. Defaults to False.
    56. """
    57. super().__init__(http_conn_id=workwechat_conn_id, *args, **kwargs)
    58. self.message_type = message_type
    59. self.messages = messages
    60. self.images = images
    61. self.files = files
    62. self.artitles = artitles
    63. self.at_mobiles = at_mobiles
    64. self.at_all = at_all
    65. workwechat_key = conf.get('robots', 'workwechat_key')
    66. self.web_hook = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={workwechat_key}"
    67. self.upload_hook = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={workwechat_key}&type=file"
    68. self.headers = {
    69. "Content-Type": "application/json;text/plain;charset=UTF-8",
    70. "Accept": "application/json;charset=UTF-8",
    71. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
    72. "Chrome/79.0.3945.79 Safari/537.36 "
    73. }
    74. def _build_message(self):
    75. preloads = []
    76. if self.message_type in ["text", "markdown"]:
    77. for msg in self.messages:
    78. data = {
    79. "msgtype": self.message_type,
    80. self.message_type: {"content": msg}
    81. }
    82. if self.message_type == "text":
    83. data["text"]["mentioned_mobile_list"] = self.at_mobiles if not self.at_all else ["@all"]
    84. preloads.append(data)
    85. elif self.message_type == "image":
    86. for image in self.images:
    87. with open(image, "rb") as img:
    88. base64_ = str(base64.b64encode(img.read()), encoding='utf-8')
    89. with open(image, "rb") as img:
    90. md = hashlib.md5()
    91. md.update(img.read())
    92. md5_ = md.hexdigest()
    93. data = {
    94. "msgtype": "image",
    95. "image": {
    96. "base64": base64_,
    97. "md5": md5_,
    98. }
    99. }
    100. preloads.append(data)
    101. elif self.message_type == "news":
    102. data = {
    103. "msgtype": "news",
    104. "news": {
    105. "articles": self.articles
    106. }
    107. }
    108. preloads.append(data)
    109. elif self.message_type == "file":
    110. for file in self.files:
    111. with open(file, "rb") as fp:
    112. data = {"file": fp}
    113. res = requests.post(self.upload_hook, files=data)
    114. media_id = res.json()["media_id"]
    115. data = {
    116. "msgtype": "file",
    117. "file": {
    118. "media_id": media_id
    119. }
    120. }
    121. preloads.append(data)
    122. else:
    123. raise AirflowException("`message_type` must in ['text', 'markdown', 'image', 'news', 'file']")
    124. return preloads
    125. def send(self):
    126. preloads = self._build_message()
    127. for preload in preloads:
    128. res = requests.post(self.web_hook, json=preload, verify=False)
    129. if json.loads(res.text)["errmsg"] == "ok":
    130. self.log.info("successful")
    131. else:
    132. self.log.info.info("failed")