1. datax介绍

  1. datax 数据同步离线工具,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
  2. 安装部署-[安装路径](http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz),只需要进行相应的安装解压到相应的路径即可。每次执行任务则是将使用`/datax/bin/data.py`文件来去执行相应的任务即可。
  3. DataX执行同步任务得时候,将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer:Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

    2. 从MySQL到HDFS

    2.1 准备工作

    从Mysql到hdfs中,需要做以下工作:

  • 准备MySQL端的数据,建表

  • 准备hive端数据库和表,在同步之前没有相应的分区,因此还需要在hdfs的hive数据库路径下建立相应的分区 / 也可以将此步骤 和 后面的同步任务写在脚本中,定时运行即可。
  • 准备同步任务的json脚本
  • 同步之后,进行hive端的数据加载。

以下是同步任务的json脚本

  1. {
  2. "job": {
  3. "setting": {
  4. "speed": {
  5. "channel": 1
  6. },
  7. "errorLimit": {
  8. "record": 0
  9. }
  10. },
  11. "content": [{
  12. "reader": {
  13. "name": "mysqlreader",
  14. "parameter": {
  15. "username": "数据库用户名",
  16. "password": "数据库密码",
  17. "column": [
  18. "字段1", "字段2"
  19. ],
  20. "where": "同步条件",
  21. "splitPk": "id",
  22. "connection": [{
  23. "table": [
  24. "MySQL端同步表名"
  25. ],
  26. "jdbcUrl": [
  27. "jdbc:mysql://数据库IP:数据库端口/对应的数据库"
  28. ]
  29. }]
  30. }
  31. },
  32. "writer": {
  33. "name": "hdfswriter",
  34. "parameter": {
  35. "defaultFS": "hdfs://HDFS集群-IP:端口",
  36. "fileType": "text",
  37. "path": "hive端表对应的存储路径/dt=分区参数(分区按照实际情况可要可不要)",
  38. "fileName": "hive端表名$do_date.dat",
  39. "column": [{
  40. "name": "字段1",
  41. "type": "INT"
  42. },
  43. {
  44. "name": "字段2",
  45. "type": "STRING"
  46. }
  47. ],
  48. "writeMode": "append",
  49. "fieldDelimiter": "分割符"
  50. }
  51. }
  52. }]
  53. }
  54. }

2.2 参数说明

setting 参数说明

  1. 用来控制同步任务,包括并发通道`channel` 、字节流`byte` 、记录流`record` 、同步速度`speed`

reader参数说明

  1. jdbcUrl
  2. 描述:数据库地址。必填
  3. username
  4. 描述:数据源用户名
  5. password
  6. 描述: 数据源指定用户名的密码
  7. table
  8. 描述: 所选取的需要同步的表
  9. column
  10. 描述: 所配置的表中需要同步的列名集合
  11. splitPk
  12. 描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型MysqlReader将报错
  13. where
  14. 描述:筛选条件,MysqlReader根据指定的columntablewhere条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10limit不是SQL的合法where子句。 where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供wherekey或者valueDataX均视作同步全量数据。
  15. querySql
  16. 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户 可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略tablecolumn这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id 。当用户配置querySql时,MysqlReader直接忽略tablecolumnwhere条件的配置,querySql优先级大于tablecolumnwhere选项。

writer参数说明

  1. defaultFS
  2. 描述:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口;例如:hdfs://127.0.0.1:9000
  3. fileType
  4. 描述:文件的类型,目前只支持用户配置为"text""orc"
  5. path
  6. 描述:存储到Hadoop hdfs文件系统的路径信息
  7. fileName
  8. 描述:HdfsWriter写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。
  9. column
  10. 描述:写入数据的字段,不支持对部分列写入。为与hive中表关联,需要指定表中所有字段名和字段类型,其中:name指定字段名,type指定字段类型。
  11. writeMode
  12. 描述:hdfswriter写入前数据清理处理模式:
  13. § append,写入前不做任何处理,DataX hdfswriterfilename写入,并保证文件名不冲突。
  14. § nonConflict,如果目录下有fileName前缀的文件,直接报错。
  15. fieldDelimiter
  16. 描述:hdfswriter写入时的字段分隔符,需要用户保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据

2.3 执行任务

在做好所有的准备工作的时候,开始执行任务:

  1. #!/bin/bash
  2. source /etc/profile
  3. source activate python27
  4. do_date="`date +%Y-%m-%d`"
  5. # 创建目录
  6. hdfs dfs -mkdir -p /user/hive/warehouse/xxx/xxx/dt=$do_date
  7. # 数据迁移
  8. python2 $DATAX_HOME/datax.py -p "-Ddo_date=$do_date" JSON脚本
  9. # 加载数据
  10. hive -e "alter table xxx库.xxx表 add partition(dt='$do_date')"

3.出现的问题

问题1:数据库连接问题

  1. 第一: 地址错误 使用IP
  2. 第二: 帐户和密码错误
  3. 第三: 表名错误 / 字段问题
  4. 对于这种问题,则是进行核查就好

问题2:HDFS 连接问题

  1. 第一: HDFS地址问题 ,我犯了最最低级的问题,最开始使用了主机名,而没有用IP
  2. 第二:就是连接不上,可以反复重启一下集群解决

问题3:内存不够的问题,报了以下错误

  1. # There is insufficient memory for the Java Runtime Environment to continue.
  2. # Native memory allocation (malloc) failed to allocate 160088 bytes for AllocateHeap
  3. # An error report file with more information is saved as:
  4. # /users/xxx/hs_err_pidxxxx.log

出现这种问题,说明申请内存失败,遇到这种问题,可能由以下两种原因:

  1. 系统物理内存或虚拟内存不足
  2. 程序在压缩指针模式下运行, Java堆会阻塞本地堆的增长
    首先使用free-m 查询,查看内存使用情况,如下图所示已经是设置过后的图,剩余内存也不是很多,针对内存过小的问题,通过修改配置文件即可。

由于/proc/meminfo下的vm.overcommit_memory被设置成不允许overcommit造成的overcommit是指对虚拟内存的过量分配; 用户进程申请的是虚拟地址,虚拟内存需要物理内存做支撑, 如果分配太多虚拟内存, 会对性能参数影响.

vm.overcommit_memory的用处: 控制过量分配的策略. 这个参数一共有3个可选值:

  1. 0: Heuristic overcommit handling. 就是由操作系统自己决定过量分配策略
  2. 1: Always overcommit. 一直允许过量分配
  3. 2: Don't overcommit. 不允许过量分配

通过sysctl vm.overcommit_memory 来查看overcommit_memory的参数设置

则通过以下几种方式进行设置:

  1. /etc/sysctl.conf 文件中添加一行vm.overcommit_memory=1 即可,之后再使用sysctl -p 使配置文件永久生效当然这是我们在开发环境下的解决方式, 在生产环境还是要尽量去优化调整JVM的参数来保证每个程序都有足够的内存来保证运行.
  2. echo 1 > /proc/sys/vm/overcommit_memory 此方式临时生效,系统重启后消失
  3. sudo sysctl vm.overcommit_memory=0, 即vm.overcommit_memory = 0, 允许系统自己决定过量分配策略

问题4:导入hive之后数据全为NULL

  1. 出现这种问题,主要是分隔符的问题,在hive端建表的时候,设置分隔符最好和JSON脚本中的一致,在网上查了好多情况,有的说是要使用`'\t'` 分隔符,我测试的结果是我和JSON脚本中设置成一致成功了。
  1. CREATE EXTERNAL
  2. TABLE `ods_trade_shops` (
  3. `shopId` int,
  4. `userId` int
  5. )
  6. PARTITIONED by
  7. (
  8. dt string
  9. )
  10. row format delimited fields terminated by ',';

问题5:在建表的时候需要注意以下,数据类型的问题。

MySQL数据类型 Datax数据类型 HIVE数据类型
int, tinyint, smallint, mediumint, int, bigint LONG int/tinyint/bigint/smallint
float, double, decimal DOUBLE float double
varchar, char, tinytext, text, mediumtext, longtext, year STRING string
date, datetime, timestamp, time DATE date/ timestamp/string
bit , bool BOOLEAN boolean

DATAX在转换MySQL datatime字段类型为hive的timestamp时会出现问题:datax和hive不支持datetime类型,因此可能回出现同步之后时间字段多/少8小时。
解决办法有两个:
1、转换为string类型;
2、继续用timestamp类型,但是需要行存储(即text存储)。

遇见时间类型转换问题时要小心,保守最好是string,简单的比较大小不会影响后续计算。