安装完成,如果环境是Python3,则需要修改datax/bin下面的三个python文件。
linux安装DataX - 图1

GitHub

https://github.com/alibaba/DataX
https://github.com/alibaba/DataX/blob/master/introduction.md
https://github.com/alibaba/DataX/blob/master/userGuid.md

系统要求

  • Linux
  • JDK(1.8以上,推荐1.8)
  • Python(2或3都可以)
  • Apache Maven 3.x (Compile DataX)

    相关安装

    JDK安装

    https://www.oracle.com/java/technologies/downloads/archive/
    版本说明
    Linux x86 RPM Package //适用于32bit的centos、rethat(linux)操作系统
    Linux x64 RPM Package //适用于64bit的centos、rethat(linux)操作系统
    Linux x86 Compressed Archive //适用于32bit的Linux操作系统
    Linux x64 Compressed Archive //适用于64bit的Linux操作系统

    安装JDK

    1. mkdir /usr/local/jdk
    上传JDK后解压
    1. tar -zxvf jdk-8u341-linux-x64.tar.gz

    配置系统全局环境变量

    用vim编辑器来编辑profile文件,在文件末尾添加一下内容:
    如果没有vim,使用vi也可以,或者其他编辑器,都可以
    在文件中添加如下,根据自己实际情况填写
    1. vim /etc/profile
    1. export JAVA_HOME=/usr/local/jdk/jdk1.8.0_341/
    2. export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
    3. export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
    4. export JRE_HOME=$JAVA_HOME/jre
    :::warning JAVA_HOME为JDK 8安装目录 :::

    重新加载profile

    1. source /etc/profile

    查看版本

    1. java -version
    2. javac -version
    JDK 8安装完毕.

    安装 Python

    检查是否安装有Python
    1. python3 --version
    python 3.X版本在linux安装成功后,需要使用python2,不是python

    安装 DataX

    下载解压

    1. mkdir -p /usr/local/datax/ && cd /usr/local/datax/
    在线下载方式
    1. wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
    1. tar -zxvf datax.tar.gz -C /usr/local/
    手工下载方式: :::warning 直接下载DataX工具包:DataX下载地址,将下载的压缩包上传后解压即可。
    我解压后的存放目录为usr/local/datax :::

    删除隐藏文件

    需要删除隐藏文件 (重要)
    1. rm -rf /usr/local/datax/plugin/*/._*

    验证安装

    1. cd /usr/local/datax/bin
    1. python3 datax.py ../job/job.json
    输出 ```bash 2022-10-08 21:43:34.138 [job-0] INFO JobContainer - PerfTrace not enable! 2022-10-08 21:43:34.139 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.071s | All Task WaitReaderTime 0.094s | Percentage 100.00% 2022-10-08 21:43:34.140 [job-0] INFO JobContainer - 任务启动时刻 : 2022-10-08 21:43:24 任务结束时刻 : 2022-10-08 21:43:34 任务总计耗时 : 10s 任务平均流量 : 253.91KB/s 记录写入速度 : 10000rec/s 读出记录总数 : 100000 读写失败总数 : 0
  1. <a name="xmSzz"></a>
  2. ## 修改python文件
  3. 安装完成,如果环境是Python3,则需要修改datax/bin下面的三个python文件。<br />[修改的文件](https://github.com/WeiYe-Jing/datax-web/tree/master/doc/datax-web/datax-python3)
  4. ```python
  5. #!/usr/bin/env python
  6. # -*- coding:utf-8 -*-
  7. import sys
  8. import os
  9. import signal
  10. import subprocess
  11. import time
  12. import re
  13. import socket
  14. import json
  15. from optparse import OptionParser
  16. from optparse import OptionGroup
  17. from string import Template
  18. import codecs
  19. import platform
  20. def isWindows():
  21. return platform.system() == 'Windows'
  22. DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  23. DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
  24. if isWindows():
  25. codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
  26. CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
  27. else:
  28. CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
  29. LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
  30. DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
  31. DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
  32. DATAX_HOME, LOGBACK_FILE)
  33. ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
  34. DEFAULT_PROPERTY_CONF, CLASS_PATH)
  35. REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
  36. RET_STATE = {
  37. "KILL": 143,
  38. "FAIL": -1,
  39. "OK": 0,
  40. "RUN": 1,
  41. "RETRY": 2
  42. }
  43. def getLocalIp():
  44. try:
  45. return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
  46. except:
  47. return "Unknown"
  48. def suicide(signum, e):
  49. global child_process
  50. print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)
  51. if child_process:
  52. child_process.send_signal(signal.SIGQUIT)
  53. time.sleep(1)
  54. child_process.kill()
  55. print >> sys.stderr, "DataX Process was killed ! you did ?"
  56. sys.exit(RET_STATE["KILL"])
  57. def register_signal():
  58. if not isWindows():
  59. global child_process
  60. signal.signal(2, suicide)
  61. signal.signal(3, suicide)
  62. signal.signal(15, suicide)
  63. def getOptionParser():
  64. usage = "usage: %prog [options] job-url-or-path"
  65. parser = OptionParser(usage=usage)
  66. prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
  67. "Normal user use these options to set jvm parameters, job runtime mode etc. "
  68. "Make sure these options can be used in Product Env.")
  69. prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
  70. default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
  71. prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
  72. help="Set job unique id when running by Distribute/Local Mode.")
  73. prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
  74. action="store", default="standalone",
  75. help="Set job runtime mode such as: standalone, local, distribute. "
  76. "Default mode is standalone.")
  77. prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
  78. action="store", dest="params",
  79. help='Set job parameter, eg: the source tableName you want to set it by command, '
  80. 'then you can use like this: -p"-DtableName=your-table-name", '
  81. 'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
  82. 'Note: you should config in you job tableName with ${tableName}.')
  83. prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
  84. action="store", dest="reader",type="string",
  85. help='View job config[reader] template, eg: mysqlreader,streamreader')
  86. prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
  87. action="store", dest="writer",type="string",
  88. help='View job config[writer] template, eg: mysqlwriter,streamwriter')
  89. parser.add_option_group(prodEnvOptionGroup)
  90. devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
  91. "Developer use these options to trace more details of DataX.")
  92. devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
  93. help="Set to remote debug mode.")
  94. devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
  95. default="info", help="Set log level such as: debug, info, all etc.")
  96. parser.add_option_group(devEnvOptionGroup)
  97. return parser
  98. def generateJobConfigTemplate(reader, writer):
  99. readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)
  100. writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)
  101. print (readerRef)
  102. print (writerRef)
  103. jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
  104. print (jobGuid)
  105. jobTemplate={
  106. "job": {
  107. "setting": {
  108. "speed": {
  109. "channel": ""
  110. }
  111. },
  112. "content": [
  113. {
  114. "reader": {},
  115. "writer": {}
  116. }
  117. ]
  118. }
  119. }
  120. readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)
  121. writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)
  122. try:
  123. readerPar = readPluginTemplate(readerTemplatePath);
  124. except Exception as e:
  125. print ("Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath))
  126. try:
  127. writerPar = readPluginTemplate(writerTemplatePath);
  128. except Exception as e:
  129. print ("Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath))
  130. jobTemplate['job']['content'][0]['reader'] = readerPar;
  131. jobTemplate['job']['content'][0]['writer'] = writerPar;
  132. print (json.dumps(jobTemplate, indent=4, sort_keys=True))
  133. def readPluginTemplate(plugin):
  134. with open(plugin, 'r') as f:
  135. return json.load(f)
  136. def isUrl(path):
  137. if not path:
  138. return False
  139. assert (isinstance(path, str))
  140. m = re.match(r"^http[s]?://\S+\w*", path.lower())
  141. if m:
  142. return True
  143. else:
  144. return False
  145. def buildStartCommand(options, args):
  146. commandMap = {}
  147. tempJVMCommand = DEFAULT_JVM
  148. if options.jvmParameters:
  149. tempJVMCommand = tempJVMCommand + " " + options.jvmParameters
  150. if options.remoteDebug:
  151. tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
  152. print ('local ip: ', getLocalIp())
  153. if options.loglevel:
  154. tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))
  155. if options.mode:
  156. commandMap["mode"] = options.mode
  157. # jobResource 鍙兘鏄?URL锛屼篃鍙兘鏄湰鍦版枃浠惰矾寰勶紙鐩稿,缁濆锛? jobResource = args[0]
  158. if not isUrl(jobResource):
  159. jobResource = os.path.abspath(jobResource)
  160. if jobResource.lower().startswith("file://"):
  161. jobResource = jobResource[len("file://"):]
  162. jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
  163. if options.params:
  164. jobParams = jobParams + " " + options.params
  165. if options.jobid:
  166. commandMap["jobid"] = options.jobid
  167. commandMap["jvm"] = tempJVMCommand
  168. commandMap["params"] = jobParams
  169. commandMap["job"] = jobResource
  170. return Template(ENGINE_COMMAND).substitute(**commandMap)
  171. def printCopyright():
  172. print ('''
  173. DataX (%s), From Alibaba !
  174. Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
  175. ''' % DATAX_VERSION)
  176. sys.stdout.flush()
  177. if __name__ == "__main__":
  178. printCopyright()
  179. parser = getOptionParser()
  180. options, args = parser.parse_args(sys.argv[1:])
  181. if options.reader is not None and options.writer is not None:
  182. generateJobConfigTemplate(options.reader,options.writer)
  183. sys.exit(RET_STATE['OK'])
  184. if len(args) != 1:
  185. parser.print_help()
  186. sys.exit(RET_STATE['FAIL'])
  187. startCommand = buildStartCommand(options, args)
  188. # print startCommand
  189. child_process = subprocess.Popen(startCommand, shell=True)
  190. register_signal()
  191. (stdout, stderr) = child_process.communicate()
  192. sys.exit(child_process.returncode)
  1. #! /usr/bin/env python
  2. # vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:
  3. import re
  4. import sys
  5. import time
  6. REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
  7. REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
  8. REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
  9. REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
  10. REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
  11. REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
  12. REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
  13. REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)
  14. # {{{ function parse_timestamp() #
  15. def parse_timestamp(line):
  16. try:
  17. ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
  18. except:
  19. ts = 0
  20. return ts
  21. # }}} #
  22. # {{{ function parse_query_host() #
  23. def parse_query_host(line):
  24. ori = REG_SQL_JDBC.search(line)
  25. if (not ori):
  26. return ''
  27. ori = ori.group(1).split('?')[0]
  28. off = ori.find('@')
  29. if (off > -1):
  30. ori = ori[off+1:len(ori)]
  31. else:
  32. off = ori.find('//')
  33. if (off > -1):
  34. ori = ori[off+2:len(ori)]
  35. return ori.lower()
  36. # }}} #
  37. # {{{ function parse_query_table() #
  38. def parse_query_table(line):
  39. ori = REG_SQL_PATH.search(line)
  40. return (ori and ori.group(1).lower()) or ''
  41. # }}} #
  42. # {{{ function parse_reader_task() #
  43. def parse_task(fname):
  44. global LAST_SQL_UUID
  45. global LAST_COMMIT_UUID
  46. global DATAX_JOBDICT
  47. global DATAX_JOBDICT_COMMIT
  48. global UNIXTIME
  49. LAST_SQL_UUID = ''
  50. DATAX_JOBDICT = {}
  51. LAST_COMMIT_UUID = ''
  52. DATAX_JOBDICT_COMMIT = {}
  53. UNIXTIME = int(time.time())
  54. with open(fname, 'r') as f:
  55. for line in f.readlines():
  56. line = line.strip()
  57. if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
  58. DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
  59. LAST_SQL_UUID = ''
  60. if line.find('CommonRdbmsReader$Task') > 0:
  61. parse_read_task(line)
  62. elif line.find('commit blocks') > 0:
  63. parse_write_task(line)
  64. else:
  65. continue
  66. # }}} #
  67. # {{{ function parse_read_task() #
  68. def parse_read_task(line):
  69. ser = REG_SQL_UUID.search(line)
  70. if not ser:
  71. return
  72. LAST_SQL_UUID = ser.group()
  73. if REG_SQL_WAKE.search(line):
  74. DATAX_JOBDICT[LAST_SQL_UUID] = {
  75. 'stat' : 'R',
  76. 'wake' : parse_timestamp(line),
  77. 'done' : UNIXTIME,
  78. 'host' : parse_query_host(line),
  79. 'path' : parse_query_table(line)
  80. }
  81. elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
  82. DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
  83. DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
  84. # }}} #
  85. # {{{ function parse_write_task() #
  86. def parse_write_task(line):
  87. ser = REG_COMMIT_UUID.search(line)
  88. if not ser:
  89. return
  90. LAST_COMMIT_UUID = ser.group()
  91. if REG_COMMIT_WAKE.search(line):
  92. DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = {
  93. 'stat' : 'R',
  94. 'wake' : parse_timestamp(line),
  95. 'done' : UNIXTIME,
  96. }
  97. elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
  98. DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
  99. DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
  100. # }}} #
  101. # {{{ function result_analyse() #
  102. def result_analyse():
  103. def compare(a, b):
  104. return b['cost'] - a['cost']
  105. tasklist = []
  106. hostsmap = {}
  107. statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
  108. tasklist_commit = []
  109. statvars_commit = {'sum' : 0, 'cnt' : 0}
  110. for idx in DATAX_JOBDICT:
  111. item = DATAX_JOBDICT[idx]
  112. item['uuid'] = idx;
  113. item['cost'] = item['done'] - item['wake']
  114. tasklist.append(item);
  115. if (not (item['host'] in hostsmap)):
  116. hostsmap[item['host']] = 1
  117. statvars['svr'] += 1
  118. if (item['cost'] > -1 and item['cost'] < 864000):
  119. statvars['sum'] += item['cost']
  120. statvars['cnt'] += 1
  121. statvars['max'] = max(statvars['max'], item['done'])
  122. statvars['min'] = min(statvars['min'], item['wake'])
  123. for idx in DATAX_JOBDICT_COMMIT:
  124. itemc = DATAX_JOBDICT_COMMIT[idx]
  125. itemc['uuid'] = idx
  126. itemc['cost'] = itemc['done'] - itemc['wake']
  127. tasklist_commit.append(itemc)
  128. if (itemc['cost'] > -1 and itemc['cost'] < 864000):
  129. statvars_commit['sum'] += itemc['cost']
  130. statvars_commit['cnt'] += 1
  131. ttl = (statvars['max'] - statvars['min']) or 1
  132. idx = float(statvars['cnt']) / (statvars['sum'] or ttl)
  133. tasklist.sort(compare)
  134. for item in tasklist:
  135. print '%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
  136. time.strftime('%H:%M:%S', time.localtime(item['wake'])),
  137. (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
  138. item['cost'], 100 * item['cost'] / ttl, idx * item['cost'])
  139. if (not len(tasklist) or not statvars['cnt']):
  140. return
  141. print '\n--- DataX Profiling Statistics ---'
  142. print '%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
  143. statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt'])
  144. print 'Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
  145. time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
  146. time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
  147. float(statvars['sum']) / ttl, idx * tasklist[0]['cost'])
  148. idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
  149. tasklist_commit.sort(compare)
  150. print '%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
  151. statvars_commit['cnt'],
  152. statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
  153. idx_commit * tasklist_commit[0]['cost'])
  154. # }}} #
  155. if (len(sys.argv) < 2):
  156. print "Usage: %s filename" %(sys.argv[0])
  157. quit(1)
  158. else:
  159. parse_task(sys.argv[1])
  160. result_analyse()
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. """
  4. Life's short, Python more.
  5. """
  6. import re
  7. import os
  8. import sys
  9. import json
  10. import uuid
  11. import signal
  12. import time
  13. import subprocess
  14. from optparse import OptionParser
  15. reload(sys)
  16. sys.setdefaultencoding('utf8')
  17. ##begin cli & help logic
  18. def getOptionParser():
  19. usage = getUsage()
  20. parser = OptionParser(usage = usage)
  21. #rdbms reader and writer
  22. parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
  23. parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')
  24. parser.add_option('-c', '--channel', action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
  25. parser.add_option('-f', '--file', action='store', help='existing datax configuration file, include reader and writer params')
  26. parser.add_option('-t', '--type', action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
  27. parser.add_option('-d', '--delete', action='store', default='true', help='delete temporary files, the default value is true')
  28. #parser.add_option('-h', '--help', action='store', default='true', help='print usage information')
  29. return parser
  30. def getUsage():
  31. return '''
  32. The following params are available for -r --reader:
  33. [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key]
  34. *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc...
  35. *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
  36. *username: username for datasource
  37. *password: password for datasource
  38. *table: table name for read data
  39. column: column to be read, the default value is ['*']
  40. splitPk: the splitPk column of rdbms table
  41. where: limit the scope of the performance data set
  42. fetchSize: how many rows to be fetched at each communicate
  43. [these params is for stream reader, used to trace rdbms write performance]
  44. reader-sliceRecordCount: how man test data to mock(each channel), the default value is 10000
  45. reader-column : stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function)锛宒emo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]
  46. The following params are available for -w --writer:
  47. [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key]
  48. datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc...
  49. *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
  50. *username: username for datasource
  51. *password: password for datasource
  52. *table: table name for write data
  53. column: column to be writed, the default value is ['*']
  54. batchSize: how many rows to be storeed at each communicate, the default value is 512
  55. preSql: prepare sql to be executed before write data, the default value is ''
  56. postSql: post sql to be executed end of write data, the default value is ''
  57. url: required for ads, pattern is ip:port
  58. schme: required for ads, ads database name
  59. [these params is for stream writer, used to trace rdbms read performance]
  60. writer-print: true means print data read from source datasource, the default value is false
  61. The following params are available global control:
  62. -c --channel: the number of concurrent tasks, the default value is 1
  63. -f --file: existing completely dataX configuration file path
  64. -t --type: test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file
  65. -h --help: print help message
  66. some demo:
  67. perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}'
  68. perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
  69. perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}'
  70. perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
  71. some example jdbc url pattern, may help:
  72. jdbc:oracle:thin:@ip:port:database
  73. jdbc:mysql://ip:port/database
  74. jdbc:sqlserver://ip:port;DatabaseName=database
  75. jdbc:postgresql://ip:port/database
  76. warn: ads url pattern is ip:port
  77. warn: test write performance will write data into your table, you can use a temporary table just for test.
  78. '''
  79. def printCopyright():
  80. DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
  81. print '''
  82. DataX Util Tools (%s), From Alibaba !
  83. Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION
  84. sys.stdout.flush()
  85. def yesNoChoice():
  86. yes = set(['yes','y', 'ye', ''])
  87. no = set(['no','n'])
  88. choice = raw_input().lower()
  89. if choice in yes:
  90. return True
  91. elif choice in no:
  92. return False
  93. else:
  94. sys.stdout.write("Please respond with 'yes' or 'no'")
  95. ##end cli & help logic
  96. ##begin process logic
  97. def suicide(signum, e):
  98. global childProcess
  99. print >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)
  100. if childProcess:
  101. childProcess.send_signal(signal.SIGQUIT)
  102. time.sleep(1)
  103. childProcess.kill()
  104. print >> sys.stderr, "DataX Process was killed ! you did ?"
  105. sys.exit(-1)
  106. def registerSignal():
  107. global childProcess
  108. signal.signal(2, suicide)
  109. signal.signal(3, suicide)
  110. signal.signal(15, suicide)
  111. def fork(command, isShell=False):
  112. global childProcess
  113. childProcess = subprocess.Popen(command, shell = isShell)
  114. registerSignal()
  115. (stdout, stderr) = childProcess.communicate()
  116. #闃诲鐩村埌瀛愯繘绋嬬粨鏉? childProcess.wait()
  117. return childProcess.returncode
  118. ##end process logic
  119. ##begin datax json generate logic
  120. #warn: if not '': -> true; if not None: -> true
  121. def notNone(obj, context):
  122. if not obj:
  123. raise Exception("Configuration property [%s] could not be blank!" % (context))
  124. def attributeNotNone(obj, attributes):
  125. for key in attributes:
  126. notNone(obj.get(key), key)
  127. def isBlank(value):
  128. if value is None or len(value.strip()) == 0:
  129. return True
  130. return False
  131. def parsePluginName(jdbcUrl, pluginType):
  132. import re
  133. #warn: drds
  134. name = 'pluginName'
  135. mysqlRegex = re.compile('jdbc:(mysql)://.*')
  136. if (mysqlRegex.match(jdbcUrl)):
  137. name = 'mysql'
  138. postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
  139. if (postgresqlRegex.match(jdbcUrl)):
  140. name = 'postgresql'
  141. oracleRegex = re.compile('jdbc:(oracle):.*')
  142. if (oracleRegex.match(jdbcUrl)):
  143. name = 'oracle'
  144. sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
  145. if (sqlserverRegex.match(jdbcUrl)):
  146. name = 'sqlserver'
  147. db2Regex = re.compile('jdbc:(db2)://.*')
  148. if (db2Regex.match(jdbcUrl)):
  149. name = 'db2'
  150. return "%s%s" % (name, pluginType)
  151. def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
  152. dataxTemplate = {
  153. "job": {
  154. "setting": {
  155. "speed": {
  156. "channel": 1
  157. }
  158. },
  159. "content": [
  160. {
  161. "reader": {
  162. "name": "",
  163. "parameter": {
  164. "username": "",
  165. "password": "",
  166. "sliceRecordCount": "10000",
  167. "column": [
  168. "*"
  169. ],
  170. "connection": [
  171. {
  172. "table": [],
  173. "jdbcUrl": []
  174. }
  175. ]
  176. }
  177. },
  178. "writer": {
  179. "name": "",
  180. "parameter": {
  181. "print": "false",
  182. "connection": [
  183. {
  184. "table": [],
  185. "jdbcUrl": ''
  186. }
  187. ]
  188. }
  189. }
  190. }
  191. ]
  192. }
  193. }
  194. dataxTemplate['job']['setting']['speed']['channel'] = channel
  195. dataxTemplateContent = dataxTemplate['job']['content'][0]
  196. pluginName = ''
  197. if paramsDict.get('datasourceType'):
  198. pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
  199. elif paramsDict.get('jdbcUrl'):
  200. pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
  201. elif paramsDict.get('url'):
  202. pluginName = 'adswriter'
  203. theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
  204. dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
  205. dataxPluginParamsContent.update(paramsDict)
  206. dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')
  207. if readerOrWriter == 'reader':
  208. dataxTemplateContent.get('reader')['name'] = pluginName
  209. dataxTemplateContent.get('writer')['name'] = 'streamwriter'
  210. if paramsDict.get('writer-print'):
  211. dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
  212. del dataxPluginParamsContent['writer-print']
  213. del dataxPluginParamsContentOtherSide['connection']
  214. if readerOrWriter == 'writer':
  215. dataxTemplateContent.get('reader')['name'] = 'streamreader'
  216. dataxTemplateContent.get('writer')['name'] = pluginName
  217. if paramsDict.get('reader-column'):
  218. dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
  219. del dataxPluginParamsContent['reader-column']
  220. if paramsDict.get('reader-sliceRecordCount'):
  221. dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
  222. del dataxPluginParamsContent['reader-sliceRecordCount']
  223. del dataxPluginParamsContentOtherSide['connection']
  224. if paramsDict.get('jdbcUrl'):
  225. if readerOrWriter == 'reader':
  226. dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
  227. else:
  228. dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
  229. if paramsDict.get('table'):
  230. dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])
  231. traceJobJson = json.dumps(dataxTemplate, indent = 4)
  232. return traceJobJson
  233. def isUrl(path):
  234. if not path:
  235. return False
  236. if not isinstance(path, str):
  237. raise Exception('Configuration file path required for the string, you configure is:%s' % path)
  238. m = re.match(r"^http[s]?://\S+\w*", path.lower())
  239. if m:
  240. return True
  241. else:
  242. return False
  243. def readJobJsonFromLocal(jobConfigPath):
  244. jobConfigContent = None
  245. jobConfigPath = os.path.abspath(jobConfigPath)
  246. file = open(jobConfigPath)
  247. try:
  248. jobConfigContent = file.read()
  249. finally:
  250. file.close()
  251. if not jobConfigContent:
  252. raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
  253. return jobConfigContent
  254. def readJobJsonFromRemote(jobConfigPath):
  255. import urllib
  256. conn = urllib.urlopen(jobConfigPath)
  257. jobJson = conn.read()
  258. return jobJson
  259. def parseJson(strConfig, context):
  260. try:
  261. return json.loads(strConfig)
  262. except Exception, e:
  263. import traceback
  264. traceback.print_exc()
  265. sys.stdout.flush()
  266. print >> sys.stderr, '%s %s need in line with json syntax' % (context, strConfig)
  267. sys.exit(-1)
  268. def convert(options, args):
  269. traceJobJson = ''
  270. if options.file:
  271. if isUrl(options.file):
  272. traceJobJson = readJobJsonFromRemote(options.file)
  273. else:
  274. traceJobJson = readJobJsonFromLocal(options.file)
  275. traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
  276. attributeNotNone(traceJobDict, ['job'])
  277. attributeNotNone(traceJobDict['job'], ['content'])
  278. attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
  279. attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
  280. attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
  281. if options.type == 'reader':
  282. traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
  283. if options.reader:
  284. traceReaderDict = parseJson(options.reader, 'reader config')
  285. if traceReaderDict.get('writer-print') is not None:
  286. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
  287. else:
  288. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
  289. else:
  290. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
  291. elif options.type == 'writer':
  292. traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
  293. if options.writer:
  294. traceWriterDict = parseJson(options.writer, 'writer config')
  295. if traceWriterDict.get('reader-column'):
  296. traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
  297. if traceWriterDict.get('reader-sliceRecordCount'):
  298. traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
  299. else:
  300. columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
  301. streamReaderColumn = []
  302. for i in range(columnSize):
  303. streamReaderColumn.append({"type": "long", "random": "2,10"})
  304. traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
  305. traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
  306. else:
  307. pass#do nothing
  308. return json.dumps(traceJobDict, indent = 4)
  309. elif options.reader:
  310. traceReaderDict = parseJson(options.reader, 'reader config')
  311. return renderDataXJson(traceReaderDict, 'reader', options.channel)
  312. elif options.writer:
  313. traceWriterDict = parseJson(options.writer, 'writer config')
  314. return renderDataXJson(traceWriterDict, 'writer', options.channel)
  315. else:
  316. print getUsage()
  317. sys.exit(-1)
  318. #dataxParams = {}
  319. #for opt, value in options.__dict__.items():
  320. # dataxParams[opt] = value
  321. ##end datax json generate logic
  322. if __name__ == "__main__":
  323. printCopyright()
  324. parser = getOptionParser()
  325. options, args = parser.parse_args(sys.argv[1:])
  326. #print options, args
  327. dataxTraceJobJson = convert(options, args)
  328. #鐢盡AC鍦板潃銆佸綋鍓嶆椂闂存埑銆侀殢鏈烘暟鐢熸垚,鍙互淇濊瘉鍏ㄧ悆鑼冨洿鍐呯殑鍞竴鎬? dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
  329. jobConfigOk = True
  330. if os.path.exists(dataxJobPath):
  331. print "file already exists, truncate and rewrite it? %s" % dataxJobPath
  332. if yesNoChoice():
  333. jobConfigOk = True
  334. else:
  335. print "exit failed, because of file conflict"
  336. sys.exit(-1)
  337. fileWriter = open(dataxJobPath, 'w')
  338. fileWriter.write(dataxTraceJobJson)
  339. fileWriter.close()
  340. print "trace environments:"
  341. print "dataxJobPath: %s" % dataxJobPath
  342. dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  343. print "dataxHomePath: %s" % dataxHomePath
  344. dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
  345. print "dataxCommand: %s" % dataxCommand
  346. returncode = fork(dataxCommand, True)
  347. if options.delete == 'true':
  348. os.remove(dataxJobPath)
  349. sys.exit(returncode)

DataX 基本使用

查看模板

查看 streamreader —> streamwriter 的模板

  1. python3 /usr/local/datax/bin/datax.py -r streamreader -w streamwriter

输出

  1. DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
  2. Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
  3. Please refer to the streamreader document:
  4. https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
  5. Please refer to the streamwriter document:
  6. https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
  7. Please save the following configuration as a json file and use
  8. python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
  9. to run the job.
  10. {
  11. "job": {
  12. "content": [
  13. {
  14. "reader": {
  15. "name": "streamreader",
  16. "parameter": {
  17. "column": [],
  18. "sliceRecordCount": ""
  19. }
  20. },
  21. "writer": {
  22. "name": "streamwriter",
  23. "parameter": {
  24. "encoding": "",
  25. "print": true
  26. }
  27. }
  28. }
  29. ],
  30. "setting": {
  31. "speed": {
  32. "channel": ""
  33. }
  34. }
  35. }
  36. }

run test job

根据模板编写 json 文件

  1. cd /usr/local/datax/job
  1. cat <<END > test.json
  2. {
  3. "job": {
  4. "content": [
  5. {
  6. "reader": {
  7. "name": "streamreader",
  8. "parameter": {
  9. "column": [ # 同步的列名 (* 表示所有)
  10. {
  11. "type":"string",
  12. "value":"Hello."
  13. },
  14. {
  15. "type":"string",
  16. "value":"河北彭于晏"
  17. },
  18. ],
  19. "sliceRecordCount": "3" # 打印数量
  20. }
  21. },
  22. "writer": {
  23. "name": "streamwriter",
  24. "parameter": {
  25. "encoding": "utf-8", # 编码
  26. "print": true
  27. }
  28. }
  29. }
  30. ],
  31. "setting": {
  32. "speed": {
  33. "channel": "2" # 并发 (即 sliceRecordCount * channel = 结果)
  34. }
  35. }
  36. }
  37. }

输出:(要是复制上面的话,需要把 # 带的内容去掉)

  1. cat <<END > test.json
  2. {
  3. "job": {
  4. "content": [
  5. {
  6. "reader": {
  7. "name": "streamreader",
  8. "parameter": {
  9. "column": [
  10. {
  11. "type": "string",
  12. "value": "Hello."
  13. },
  14. {
  15. "type": "string",
  16. "value": "河北彭于晏"
  17. },
  18. ],
  19. "sliceRecordCount": "3"
  20. }
  21. },
  22. "writer": {
  23. "name": "streamwriter",
  24. "parameter": {
  25. "encoding": "utf-8",
  26. "print": true
  27. }
  28. }
  29. }
  30. ],
  31. "setting": {
  32. "speed": {
  33. "channel": "2"
  34. }
  35. }
  36. }
  37. }

再在命令行输入END,验证test.json

  1. python3 /usr/local/datax/bin/datax.py test.json

输出
1665238434815.png