- BashOperator
- PythonOperator
- Google 云平台 Operators(执行器)
- GoogleCloudStorageToBigQueryOperator
- GceInstanceStartOperator
- GceInstanceStopOperator
- GceSetMachineTypeOperator
- GcfFunctionDeleteOperator
- GcfFunctionDeployOperator
- CloudSqlInstanceDatabaseCreateOperator
- CloudSqlInstanceDatabaseDeleteOperator
- CloudSqlInstanceDatabasePatchOperator
- CloudSqlInstanceDeleteOperator
- CloudSqlInstanceCreateOperator
- CloudSqlInstancePatchOperator
operator(执行器)代表一个理想情况下是幂等的任务。operator(执行器)决定了 DAG 运行时实际执行的内容。
有关更多信息,请参阅Operators Concepts文档和Operators API Reference 。
BashOperator
使用BashOperator在Bash shell 中执行命令。
run_this = BashOperator(task_id='run_after_loop',bash_command='echo 1',dag=dag)
模板
您可以使用Jinja 模板来参数化bash_command参数。
also_run_this = BashOperator(task_id='also_run_this',bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',dag=dag,)
故障排除
找不到 Jinja 模板
在使用bash_command参数直接调用 Bash 脚本时,需要在脚本名称后添加空格。这是因为 Airflow 尝试将 Jinja 模板应用于一个失败的脚本。
t2 = BashOperator(task_id='bash_example',# 这将会出现`Jinja template not found`的错误# bash_command="/home/batcher/test.sh",# 在加了空格之后,这会正常工作bash_command="/home/batcher/test.sh ",dag=dag)
PythonOperator
使用PythonOperator执行 Python 回调。
def print_context ( ds , ** kwargs ):pprint ( kwargs )print ( ds )return 'Whatever you return gets printed in the logs'run_this = PythonOperator (task_id = 'print_the_context' ,provide_context = True ,python_callable = print_context ,dag = dag )
传递参数
使用op_args和op_kwargs参数将额外参数传递给 Python 的回调函数。
def my_sleeping_function(random_base):"""这是一个将在 DAG 执行体中运行的函数"""time.sleep(random_base)# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectivelyfor i in range(5):task = PythonOperator(task_id='sleep_for_' + str(i),python_callable=my_sleeping_function,op_kwargs={'random_base': float(i) / 10},dag=dag,)run_this >> task
模板
当您将provide_context参数设置为True,Airflow 会传入一组额外的关键字参数:一个用于每个Jinja 模板变量和一个templates_dict参数。
templates_dict参数是模板化的,因此字典中的每个值都被评估为Jinja 模板。
Google 云平台 Operators(执行器)
GoogleCloudStorageToBigQueryOperator
使用GoogleCloudStorageToBigQueryOperator执行 BigQuery 加载作业。
GceInstanceStartOperator
允许启动一个已存在的 Google Compute Engine 实例。
在此示例中,参数值从 Airflow 变量中提取。此外,default_args字典用于将公共参数传递给单个 DAG 中的所有 operator(执行器)。
PROJECT_ID = models.Variable.get('PROJECT_ID', '')LOCATION = models.Variable.get('LOCATION', '')INSTANCE = models.Variable.get('INSTANCE', '')SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '')SET_MACHINE_TYPE_BODY = {'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME)}default_args = {'start_date': airflow.utils.dates.days_ago(1)}
通过将所需的参数传递给构造函数来定义GceInstanceStartOperator。
gce_instance_start = GceInstanceStartOperator(project_id=PROJECT_ID,zone=LOCATION,resource_id=INSTANCE,task_id='gcp_compute_start_task')
GceInstanceStopOperator
允许停止一个已存在的 Google Compute Engine 实例。
参数定义请参阅上面的GceInstanceStartOperator。
通过将所需的参数传递给构造函数来定义GceInstanceStopOperator。
gce_instance_stop = GceInstanceStopOperator(project_id=PROJECT_ID,zone=LOCATION,resource_id=INSTANCE,task_id='gcp_compute_stop_task')
GceSetMachineTypeOperator
允许把一个已停止实例的机器类型改变至特定的类型。
参数定义请参阅上面的GceInstanceStartOperator。
通过将所需的参数传递给构造函数来定义GceSetMachineTypeOperator。
gce_set_machine_type = GceSetMachineTypeOperator(project_id=PROJECT_ID,zone=LOCATION,resource_id=INSTANCE,body=SET_MACHINE_TYPE_BODY,task_id='gcp_compute_set_machine_type')
GcfFunctionDeleteOperator
使用default_args字典来传递参数给 operator(执行器)。
PROJECT_ID = models.Variable.get('PROJECT_ID', '')LOCATION = models.Variable.get('LOCATION', '')ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')# A fully-qualified name of the function to deleteFUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,ENTRYPOINT)default_args = {'start_date': airflow.utils.dates.days_ago(1)}
使用GcfFunctionDeleteOperator来从 Google Cloud Functions 删除一个函数。
t1 = GcfFunctionDeleteOperator(task_id="gcf_delete_task",name=FUNCTION_NAME)
故障排除
如果你想要使用服务账号来运行或部署一个 operator(执行器),但得到了一个 403 禁止的错误,这意味着你的服务账号没有正确的 Cloud IAM 权限。
- 指定该服务账号为 Cloud Functions Developer 角色。
- 授权 Cloud Functions 的运行账户为 Cloud IAM Service Account User 角色。
使用 gcloud 分配 Cloud IAM 权限的典型方法如下所示。只需将您的 Google Cloud Platform 项目 ID 替换为 PROJECT_ID,将 SERVICE_ACCOUNT_EMAIL 替换为您的服务帐户的电子邮件 ID 即可。
gcloud iam service-accounts add-iam-policy-binding \PROJECT_ID@appspot.gserviceaccount.com \--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \--role="roles/iam.serviceAccountUser"
细节请参阅Adding the IAM service agent user role to the runtime service
GcfFunctionDeployOperator
使用GcfFunctionDeployOperator来从 Google Cloud Functions 部署一个函数。
以下 Airflow 变量示例显示了您可以使用的 default_args 的各种变体和组合。变量定义如下:
PROJECT_ID = models.Variable.get('PROJECT_ID', '')LOCATION = models.Variable.get('LOCATION', '')SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')ZIP_PATH = models.Variable.get('ZIP_PATH', '')ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,ENTRYPOINT)RUNTIME = 'nodejs6'VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)
使用这些变量,您可以定义请求的主体:
body = {"name": FUNCTION_NAME,"entryPoint": ENTRYPOINT,"runtime": RUNTIME,"httpsTrigger": {}}
创建 DAG 时,default_args 字典可用于传递正文和其他参数:
default_args = {'start_date': dates.days_ago(1),'project_id': PROJECT_ID,'location': LOCATION,'body': body,'validate_body': VALIDATE_BODY}
请注意,在上面的示例中,body 和 default_args 都是不完整的。根据设置的变量,如何传递源代码相关字段可能有不同的变体。目前,您可以传递 sourceArchiveUrl,sourceRepository 或 sourceUploadUrl,CloudFunction API 规范中所述。此外,default_args 可能包含 zip_path 参数,以在部署源代码之前运行上载源代码的额外步骤。在最后一种情况下,您还需要在正文中提供一个空的 sourceUploadUrl 参数。
基于上面定义的变量,此处显示了设置源代码相关字段的示例逻辑:
if SOURCE_ARCHIVE_URL:body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URLelif SOURCE_REPOSITORY:body['sourceRepository'] = {'url': SOURCE_REPOSITORY}elif ZIP_PATH:body['sourceUploadUrl'] = ''default_args['zip_path'] = ZIP_PATHelif SOURCE_UPLOAD_URL:body['sourceUploadUrl'] = SOURCE_UPLOAD_URLelse:raise Exception("Please provide one of the source_code parameters")
创建 operator(执行器)的代码如下:
deploy_task = GcfFunctionDeployOperator(task_id="gcf_deploy_task",name=FUNCTION_NAME)
Troubleshooting
如果你想要使用服务账号来运行或部署一个 operator(执行器),但得到了一个 403 禁止的错误,这意味着你的服务账号没有正确的 Cloud IAM 权限。
- 指定该服务账号为 Cloud Functions Developer 角色。
- 授权 Cloud Functions 的运行账户为 Cloud IAM Service Account User 角色。
使用 gcloud 分配 Cloud IAM 权限的典型方法如下所示。只需将您的 Google Cloud Platform 项目 ID 替换为 PROJECT_ID,将 SERVICE_ACCOUNT_EMAIL 的替换为您的服务帐户的电子邮件 ID 即可。
gcloud iam service-accounts add-iam-policy-binding \PROJECT_ID@appspot.gserviceaccount.com \--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \--role="roles/iam.serviceAccountUser"
细节请参阅Adding the IAM service agent user role to the runtime service
如果您的函数的源代码位于 Google Source Repository 中,请确保您的服务帐户具有 Source Repository Viewer 角色,以便在必要时可以下载源代码。
CloudSqlInstanceDatabaseCreateOperator
在 Cloud SQL 实例中创建新数据库。
有关参数定义,请参阅上面的GceInstanceStartOperator。
通过将所需的参数传递给构造函数来定义CloudSqlInstanceDatabaseCreateOperator。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')DB_NAME = os.environ.get('DB_NAME', 'testdb')
使用 operator(执行器)
sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(project_id=PROJECT_ID,body=db_create_body,instance=INSTANCE_NAME,task_id='sql_db_create_task')
示例请求体:
db_create_body = {"instance": INSTANCE_NAME,"name": DB_NAME,"project": PROJECT_ID}
模版
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
更多信息
有关数据库插入,请参阅Google Cloud SQL API 文档。
CloudSqlInstanceDatabaseDeleteOperator
在 Cloud SQL 实例中删除数据库。
有关参数定义,请参阅CloudSqlInstanceDatabaseDeleteOperator。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')DB_NAME = os.environ.get('DB_NAME', 'testdb')
使用 operator(执行器)
sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(project_id=PROJECT_ID,instance=INSTANCE_NAME,database=DB_NAME,task_id='sql_db_delete_task')
模版
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id','api_version')
更多信息
有关数据库删除,请参阅Google Cloud SQL API 文档。
CloudSqlInstanceDatabasePatchOperator
使用修补程序语义更新包含有关 Cloud SQL 实例内数据库的信息的资源。请参阅: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
有关参数定义,请参阅CloudSqlInstanceDatabasePatchOperator。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')DB_NAME = os.environ.get('DB_NAME', 'testdb')
使用 operator(执行器)
sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(project_id=PROJECT_ID,body=db_patch_body,instance=INSTANCE_NAME,database=DB_NAME,task_id='sql_db_patch_task')
示例请求体:
db_patch_body = {"charset": "utf16","collation": "utf16_general_ci"}
模版
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id','api_version')
更多信息
有关数据库修改,请参阅Google Cloud SQL API 文档。
CloudSqlInstanceDeleteOperator
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')DB_NAME = os.environ.get('DB_NAME', 'testdb')
使用 operator(执行器)
sql_instance_delete_task = CloudSqlInstanceDeleteOperator(project_id=PROJECT_ID,instance=INSTANCE_NAME,task_id='sql_instance_delete_task')
模版
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
更多信息
有关删除,请参阅Google Cloud SQL API 文档。
CloudSqlInstanceCreateOperator
在 Google Cloud Platform 中创建新的 Cloud SQL 实例。
有关参数定义,请参阅CloudSqlInstanceCreateOperator。
如果存在具有相同名称的实例,则不会执行任何操作,并且 operator(执行器)将成功执行。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')DB_NAME = os.environ.get('DB_NAME', 'testdb')
定义实例的示例:
body = {"name": INSTANCE_NAME,"settings": {"tier": "db-n1-standard-1","backupConfiguration": {"binaryLogEnabled": True,"enabled": True,"startTime": "05:00"},"activationPolicy": "ALWAYS","dataDiskSizeGb": 30,"dataDiskType": "PD_SSD","databaseFlags": [],"ipConfiguration": {"ipv4Enabled": True,"requireSsl": True,},"locationPreference": {"zone": "europe-west4-a"},"maintenanceWindow": {"hour": 5,"day": 7,"updateTrack": "canary"},"pricingPlan": "PER_USE","replicationType": "ASYNCHRONOUS","storageAutoResize": False,"storageAutoResizeLimit": 0,"userLabels": {"my-key": "my-value"}},"databaseVersion": "MYSQL_5_7","region": "europe-west4",}
使用 operator(执行器)
sql_instance_create_task = CloudSqlInstanceCreateOperator(project_id=PROJECT_ID,body=body,instance=INSTANCE_NAME,task_id='sql_instance_create_task')
模版
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
更多信息
有关插入,请参阅Google Cloud SQL API 文档。
CloudSqlInstancePatchOperator
更新 Google Cloud Platform 中的 Cloud SQL 实例的设置(部分更新)。
有关参数定义,请参阅CloudSqlInstancePatchOperator。
这是部分更新,因此仅设置/更新正文中指定的设置的值。现有实例的其余部分将保持不变。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')DB_NAME = os.environ.get('DB_NAME', 'testdb')
定义实例的示例:
patch_body = {"name": INSTANCE_NAME,"settings": {"dataDiskSizeGb": 35,"maintenanceWindow": {"hour": 3,"day": 6,"updateTrack": "canary"},"userLabels": {"my-key-patch": "my-value-patch"}}}
使用 operator(执行器)
sql_instance_patch_task = CloudSqlInstancePatchOperator(project_id=PROJECT_ID,body=patch_body,instance=INSTANCE_NAME,task_id='sql_instance_patch_task')
模版
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
更多信息
有关部分更新,请参阅Google Cloud SQL API 文档。
