贡献者:@ImPerat0R_

Airflow 内置了一个简单的插件管理器,可以通过简单地删除$AIRFLOW_HOME/plugins文件夹中的文件,将外部功能集成到其核心。

plugins文件夹中的 python 模块将被导入,钩子操作符传感器执行器和 Web视图将集成到 Airflow 的主要集合中,并可供使用。

做什么的?

Airflow 提供了一个用于处理数据的通用工具箱。不同的组织有不同的堆栈和不同的需求。 使用 Airflow 插件可以让公司定制他们的 Airflow 安装以反映他们的生态系统。

插件可以简便地用作编写,共享和激活新功能集。

当然还需要一组更复杂的应用程序来与不同风格的数据和元数据进行交互。

例子:

  • 一组用于解析 Hive 日志和公开 Hive 元数据(CPU/IO/阶段/倾斜/…)的工具
  • 异常检测框架,允许人们收集指标,设置阈值和警报
  • 审计工具,帮助了解谁访问了什么
  • 配置驱动的 SLA 监控工具,允许您设置受监控的表以及应该在何时着陆,提醒人员并公开停机的可视化

为什么要建立在 Airflow 之上?

Airflow 有许多组件可以在构建应用程序时重用:

  • 可用于呈现视图的 Web 服务器
  • 用于存储模型的元数据数据库
  • 访问您的数据库,以及如何连接到它们
  • 应用程序可以将工作负载推送到的一组 Workers
  • 部署了 Airflow,您可以专注于后面的工作
  • 基本的图表功能,底层库和抽象

接口

要创建插件,您需要派生airflow.plugins_manager.AirflowPlugin类并引用要插入 Airflow 的对象。以下是类似您需要派生的类:

  1. class AirflowPlugin(object):
  2. # The name of your plugin (str)
  3. name = None
  4. # A list of class(es) derived from BaseOperator
  5. operators = []
  6. # A list of class(es) derived from BaseSensorOperator
  7. sensors = []
  8. # A list of class(es) derived from BaseHook
  9. hooks = []
  10. # A list of class(es) derived from BaseExecutor
  11. executors = []
  12. # A list of references to inject into the macros namespace
  13. macros = []
  14. # A list of objects created from a class derived
  15. # from flask_admin.BaseView
  16. admin_views = []
  17. # A list of Blueprint object created from flask.Blueprint. For use with the flask_admin based GUI
  18. flask_blueprints = []
  19. # A list of menu links (flask_admin.base.MenuLink). For use with the flask_admin based GUI
  20. menu_links = []
  21. # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
  22. appbuilder_views = []
  23. # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
  24. appbuilder_menu_items = []

示例

下面的代码定义了一个插件,它在 Airflow 中注入一组虚拟对象定义。

  1. # This is the class you derive to create a plugin
  2. from airflow.plugins_manager import AirflowPlugin
  3. from flask import Blueprint
  4. from flask_admin import BaseView, expose
  5. from flask_admin.base import MenuLink
  6. # Importing base classes that we need to derive
  7. from airflow.hooks.base_hook import BaseHook
  8. from airflow.models import BaseOperator
  9. from airflow.sensors.base_sensor_operator import BaseSensorOperator
  10. from airflow.executors.base_executor import BaseExecutor
  11. # Will show up under airflow.hooks.test_plugin.PluginHook
  12. class PluginHook(BaseHook):
  13. pass
  14. # Will show up under airflow.operators.test_plugin.PluginOperator
  15. class PluginOperator(BaseOperator):
  16. pass
  17. # Will show up under airflow.sensors.test_plugin.PluginSensorOperator
  18. class PluginSensorOperator(BaseSensorOperator):
  19. pass
  20. # Will show up under airflow.executors.test_plugin.PluginExecutor
  21. class PluginExecutor(BaseExecutor):
  22. pass
  23. # Will show up under airflow.macros.test_plugin.plugin_macro
  24. def plugin_macro():
  25. pass
  26. # Creating a flask admin BaseView
  27. class TestView(BaseView):
  28. @expose('/')
  29. def test(self):
  30. # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
  31. return self.render("test_plugin/test.html", content="Hello galaxy!")
  32. v = TestView(category="Test Plugin", name="Test View")
  33. # Creating a flask blueprint to integrate the templates and static folder
  34. bp = Blueprint(
  35. "test_plugin", __name__,
  36. template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
  37. static_folder='static',
  38. static_url_path='/static/test_plugin')
  39. ml = MenuLink(
  40. category='Test Plugin',
  41. name='Test Menu Link',
  42. url='https://airflow.incubator.apache.org/')
  43. # Creating a flask appbuilder BaseView
  44. class TestAppBuilderBaseView(AppBuilderBaseView):
  45. @expose("/")
  46. def test(self):
  47. return self.render("test_plugin/test.html", content="Hello galaxy!")
  48. v_appbuilder_view = TestAppBuilderBaseView()
  49. v_appbuilder_package = {"name": "Test View",
  50. "category": "Test Plugin",
  51. "view": v_appbuilder_view}
  52. # Creating a flask appbuilder Menu Item
  53. appbuilder_mitem = {"name": "Google",
  54. "category": "Search",
  55. "category_icon": "fa-th",
  56. "href": "https://www.google.com"}
  57. # Defining the plugin class
  58. class AirflowTestPlugin(AirflowPlugin):
  59. name = "test_plugin"
  60. operators = [PluginOperator]
  61. sensors = [PluginSensorOperator]
  62. hooks = [PluginHook]
  63. executors = [PluginExecutor]
  64. macros = [plugin_macro]
  65. admin_views = [v]
  66. flask_blueprints = [bp]
  67. menu_links = [ml]
  68. appbuilder_views = [v_appbuilder_package]
  69. appbuilder_menu_items = [appbuilder_mitem]

注意基于角色的视图

Airflow 1.10 使用 FlaskAppBuilder 引入了基于角色的视图。您可以通过设置 rbac = True 来配置使用的 UI。为了支持两个版本的 UI 的插件视图和链接以保持向后兼容性,请将字段 appbuilder_views 和 appbuilder_menu_items 添加到 AirflowTestPlugin 类中。