前置条件

安装 Datakit

  1. 点击 [集成] 模块, [Datakit],根据您的操作系统和系统类型选择合适的安装命令。

image.png

  1. 复制 Datakit 安装命令在需要被监控的服务器上直接运行。
  • 安装目录 /usr/local/datakit/
  • 日志目录 /var/log/datakit/
  • 主配置文件 /usr/local/datakit/conf.d/datakit.conf
  • 插件配置目录 /usr/local/datakit/conf.d/

    安装 Function

  1. 点击 [集成] 模块, [Funciton],下载安装包并执行安装命令

image.png

  1. 安装完成后,使用浏览器访问 http://服务器IP地址:8088 进行初始化操作界面

image.png

  1. 使用默认用户名/密码 admin 登录系统

image.png

RAM 访问控制

  1. 登录 RAM 控制台 https://ram.console.aliyun.com/users
  2. 新建用户:人员管理 - 用户 - 创建用户

image.png

  1. 保存或下载 AccessKey ID 和 AccessKey Secret 的 CSV 文件 (配置文件会用到)
  2. 用户授权 (只读访问数据传输服务( DTS )权限)

image.png

脚本开发

阿里云 DTS 增量数据迁移

  1. 访问 [阿里云 DTS 产品文档 - 新版 API - 查询 DTS 任务详情],进入调试模式
  2. 找到 SDK 依赖信息 - 新版 SDK,复制 SDK 安装命令

image.png

  1. 管理 - 实验性功能 - 开启 PIP 工具模块

image.png

  1. 安装阿里云 SDK 依赖包

image.png

  1. 新建脚本集,添加脚本

image.png

  1. 编写代码,需要填入 AccessKey ID,AccessKey Secret,Region

    参考文档 :

import sys import json from typing import List

from alibabacloud_dts20200101.client import Client as Dts20200101Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dts20200101 import models as dts_20200101_models

阿里云 sdk

class Sample: def init(self): pass

  1. @staticmethod
  2. def create_client(
  3. access_key_id: str,
  4. access_key_secret: str,
  5. ) -> Dts20200101Client:
  6. """
  7. 使用AK&SK初始化账号Client
  8. @param access_key_id:
  9. @param access_key_secret:
  10. @return: Client
  11. @throws Exception
  12. """
  13. config = open_api_models.Config(
  14. # 您的AccessKey ID,
  15. access_key_id=access_key_id,
  16. # 您的AccessKey Secret,
  17. access_key_secret=access_key_secret
  18. )
  19. # 访问的域名
  20. config.endpoint = 'dts.cn-hangzhou.aliyuncs.com'
  21. return Dts20200101Client(config)
  22. @staticmethod
  23. def main(
  24. args: List[str],
  25. ) -> None:
  26. client = Sample.create_client('accessKeyId', 'accessKeySecret')
  27. describe_dts_jobs_request = dts_20200101_models.DescribeDtsJobsRequest(region='cn-hangzhou')
  28. client.describe_dts_jobs(describe_dts_jobs_request)

@DFF.API(‘dts_demo’, timeout=300) def dts_demo():

# 实例化sample
sample = Sample()
client = sample.create_client(
    access_key_id="AccessKey ID",
    access_key_secret="AccessKey Secret"
)
describe_dts_jobs_request = dts_20200101_models.DescribeDtsJobsRequest(
# DTS实例的任务类型。MIGRATION;SYNC;SUBSCRIBE。
        region='region',job_type='MIGRATION'
    )
response = client.describe_dts_jobs(describe_dts_jobs_request).to_map()
dtsjoblist = response['body']['DtsJobList'][0]
dts_instanceid = dtsjoblist['DtsInstanceID']
dts_jobname = dtsjoblist['DtsJobName']
# 增量数据迁移或同步的状态指标
datasynchronization = response['body']['DtsJobList'][0]['DataSynchronizationStatus']
dts_status = datasynchronization['Status']
dts_percent = datasynchronization['Percent']

# 行协议数据
points = [
    {'measurement': 'aliyun_api_dts',
     'tags': {'instanceId': dts_instanceid,'jobname': dts_jobname}, 
     'fields': {'dts_status': dts_status,'dts_percent':dts_percent}
    }
]
write_metrics(points)

def write_metrics(points: list): datakit = DFF.SRC(‘datakit’) res = datakit.write_metric_many(points)


7. 主函数前添加装饰器 (导出函数为 HTTP API 接口)

@DFF.API(‘名称’, timeout=300) ```

  1. 发布脚本

image.png

  1. 添加定时任务 (数据采集频率),管理 - 自动触发配置 - 选择定时时间

image.png

  1. 添加完成后可以看到所有定时任务

image.png

  1. 查看数据指标 - DataFlux - [指标]

image.png

  1. 添加[异常检测库] - DTS 迁移异常规则

image.png

  1. 阿里云 DTS 增量数据迁移失败触发异常事件

image.png