前置条件
安装 Datakit
- 点击 [集成] 模块, [Datakit],根据您的操作系统和系统类型选择合适的安装命令。

- 复制 Datakit 安装命令在需要被监控的服务器上直接运行。
- 安装目录 /usr/local/datakit/
- 日志目录 /var/log/datakit/
- 主配置文件 /usr/local/datakit/conf.d/datakit.conf
- 插件配置目录 /usr/local/datakit/conf.d/
安装 Function
- 点击 [集成] 模块, [Funciton],下载安装包并执行安装命令

- 安装完成后,使用浏览器访问 http://服务器IP地址:8088 进行初始化操作界面

- 使用默认用户名/密码 admin 登录系统
RAM 访问控制
- 登录 RAM 控制台 https://ram.console.aliyun.com/users
- 新建用户:人员管理 - 用户 - 创建用户

- 保存或下载 AccessKey ID 和 AccessKey Secret 的 CSV 文件 (配置文件会用到)
- 用户授权 (只读访问数据传输服务( DTS )权限)
脚本开发
阿里云 DTS 增量数据迁移
- 访问 [阿里云 DTS 产品文档 - 新版 API - 查询 DTS 任务详情],进入调试模式
- 找到 SDK 依赖信息 - 新版 SDK,复制 SDK 安装命令

- 管理 - 实验性功能 - 开启 PIP 工具模块

- 安装阿里云 SDK 依赖包

- 新建脚本集,添加脚本

编写代码,需要填入 AccessKey ID,AccessKey Secret,Region
参考文档 :
- [阿里云 DTS 产品文档 - 新版 API - 查询 DTS 任务详情]
- [Function DataKit 数据对接]
```
-- coding: utf-8 --
import sys import json from typing import List
from alibabacloud_dts20200101.client import Client as Dts20200101Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dts20200101 import models as dts_20200101_models
阿里云 sdk
class Sample: def init(self): pass
@staticmethoddef create_client(access_key_id: str,access_key_secret: str,) -> Dts20200101Client:"""使用AK&SK初始化账号Client@param access_key_id:@param access_key_secret:@return: Client@throws Exception"""config = open_api_models.Config(# 您的AccessKey ID,access_key_id=access_key_id,# 您的AccessKey Secret,access_key_secret=access_key_secret)# 访问的域名config.endpoint = 'dts.cn-hangzhou.aliyuncs.com'return Dts20200101Client(config)@staticmethoddef main(args: List[str],) -> None:client = Sample.create_client('accessKeyId', 'accessKeySecret')describe_dts_jobs_request = dts_20200101_models.DescribeDtsJobsRequest(region='cn-hangzhou')client.describe_dts_jobs(describe_dts_jobs_request)
@DFF.API(‘dts_demo’, timeout=300) def dts_demo():
# 实例化sample
sample = Sample()
client = sample.create_client(
access_key_id="AccessKey ID",
access_key_secret="AccessKey Secret"
)
describe_dts_jobs_request = dts_20200101_models.DescribeDtsJobsRequest(
# DTS实例的任务类型。MIGRATION;SYNC;SUBSCRIBE。
region='region',job_type='MIGRATION'
)
response = client.describe_dts_jobs(describe_dts_jobs_request).to_map()
dtsjoblist = response['body']['DtsJobList'][0]
dts_instanceid = dtsjoblist['DtsInstanceID']
dts_jobname = dtsjoblist['DtsJobName']
# 增量数据迁移或同步的状态指标
datasynchronization = response['body']['DtsJobList'][0]['DataSynchronizationStatus']
dts_status = datasynchronization['Status']
dts_percent = datasynchronization['Percent']
# 行协议数据
points = [
{'measurement': 'aliyun_api_dts',
'tags': {'instanceId': dts_instanceid,'jobname': dts_jobname},
'fields': {'dts_status': dts_status,'dts_percent':dts_percent}
}
]
write_metrics(points)
def write_metrics(points: list): datakit = DFF.SRC(‘datakit’) res = datakit.write_metric_many(points)
7. 主函数前添加装饰器 (导出函数为 HTTP API 接口)
@DFF.API(‘名称’, timeout=300) ```
- 发布脚本

- 添加定时任务 (数据采集频率),管理 - 自动触发配置 - 选择定时时间

- 添加完成后可以看到所有定时任务

- 查看数据指标 - DataFlux - [指标]

- 添加[异常检测库] - DTS 迁移异常规则

- 阿里云 DTS 增量数据迁移失败触发异常事件

