一、屡一下思路:

大数据:数据的存储,读取,分析,采集(Flume,Sqoop,DataX等)。
大数据分析:MapReduce —> Hive (离线分析) —>spark,flink(实时分析)
本次项目周:电商类型的离线数据分析项目。
用到的技术有:Hive,HDFS,Flume,Azkaban,SuperSet,DataX(Sqoop)

二、实战将三个数据来源导入到数据仓库的ODS层

image.png

第一步:导入mysql数据到ods层

1、在mysql中,创建nshop数据库,导入业务数据。

image.png
image.png
mysql的业务数据已经导入完毕。

2、在hive中创建对应的ods层数据库及各种表

hive要想能运行,必须保证集群环境是启动的。
image.png
hive 直接接入我们的hive系统。
show databases; //查看所有的数据库
drop database ods_nshop cascade; //级联删除ods_nshop数据库包含里面的表。
我们可以按照mysql中的表字段,创建我们的hive中的表字段。
image.png
在hive表中创建mysql对应的表(此处省略)

编写sqoop脚本,将mysql的数据一对一的导入到hive的表中。
案例:

  1. sqoop import \
  2. --connect jdbc:mysql://bigdata01:3306/nshop \
  3. --username root \
  4. --password 123456 \
  5. --query "select * from customer where \$CONDITIONS" \
  6. --target-dir /data/nshop/ods/ods_02_customer \
  7. --hive-import \
  8. --hvie-table ods_02_customer \
  9. --delete-target-dir \
  10. -m 1

编写真正的脚本,大约有12个左右,因为执行时间比较长,所有我们编写为azkaban任务:

  1. #!/bin/bash
  2. sqoop import \
  3. --connect jdbc:mysql://bigdata01:3306/nshop \
  4. --query "select customer_id,customer_login,customer_nickname,customer_name,customer_pass,customer_mobile,customer_idcard,customer_gender,customer_birthday,customer_age,customer_age_range,customer_email,customer_natives,customer_ctime,customer_utime,customer_device_num from customer where \$CONDITIONS" \
  5. --username root \
  6. --password 123456 \
  7. --target-dir /data/nshop/ods/ods_02_customer \
  8. --hive-import \
  9. --hive-database ods_nshop \
  10. --hive-table ods_02_customer \
  11. --delete-target-dir \
  12. -m 1
  13. sqoop import \
  14. --connect jdbc:mysql://bigdata01:3306/nshop \
  15. --query "select customer_id,attention_id,attention_type,attention_status,attention_ctime from customer_attention where \$CONDITIONS" \
  16. --username root \
  17. --password 123456 \
  18. --target-dir /data/nshop/ods/ods_02_customer_attention \
  19. --hive-import \
  20. --hive-database ods_nshop \
  21. --hive-table ods_02_customer_attention \
  22. --delete-target-dir \
  23. -m 1
  24. sqoop import \
  25. --connect jdbc:mysql://bigdata01:3306/nshop \
  26. --query "select consignee_id,customer_id,consignee_name,consignee_mobile,consignee_zipcode,consignee_addr,consignee_tag,ctime from customer_consignee where \$CONDITIONS" \
  27. --username root \
  28. --password 123456 \
  29. --target-dir /data/nshop/ods/ods_02_customer_consignee \
  30. --hive-import \
  31. --hive-database ods_nshop \
  32. --hive-table ods_02_customer_consignee \
  33. --delete-target-dir \
  34. -m 1
  35. sqoop import \
  36. --connect jdbc:mysql://bigdata01:3306/nshop \
  37. --username root \
  38. --password 123456 \
  39. --query "select category_code,category_name,category_parent_id,category_status,category_utime from category where \$CONDITIONS" \
  40. --target-dir /data/nshop/ods/dim_pub_category \
  41. --hive-import \
  42. --hive-database ods_nshop \
  43. --hive-table dim_pub_category \
  44. --delete-target-dir \
  45. -m 1
  46. sqoop import \
  47. --connect jdbc:mysql://bigdata01:3306/nshop \
  48. --username root \
  49. --password 123456 \
  50. --query "select supplier_code,supplier_name,supplier_type,supplier_status,supplier_utime from supplier where \$CONDITIONS" \
  51. --target-dir /data/nshop/ods/dim_pub_supplier \
  52. --hive-import \
  53. --hive-database ods_nshop \
  54. --hive-table dim_pub_supplier \
  55. --delete-target-dir \
  56. -m 1
  57. sqoop import \
  58. --connect jdbc:mysql://bigdata01:3306/nshop \
  59. --username root \
  60. --password 123456 \
  61. --query "select order_id,customer_id,order_status,customer_ip,customer_longitude,customer_latitude,customer_areacode,consignee_name,consignee_mobile,consignee_zipcode,pay_type,pay_code,pay_nettype,district_money,shipping_money,payment_money,order_ctime,shipping_time,receive_time from orders where \$CONDITIONS" \
  62. --target-dir /data/nshop/ods/ods_02_orders \
  63. --hive-import \
  64. --hive-database ods_nshop \
  65. --hive-table ods_02_orders \
  66. --delete-target-dir \
  67. -m 1
  68. sqoop import \
  69. --connect jdbc:mysql://bigdata01:3306/nshop \
  70. --username root \
  71. --password 123456 \
  72. --query "select order_detail_id,order_id,product_id,product_name,product_remark,product_cnt,product_price,weighing_cost,district_money,is_activity,order_detail_ctime from order_detail where \$CONDITIONS" \
  73. --target-dir /data/nshop/ods/ods_02_order_detail \
  74. --hive-import \
  75. --hive-database ods_nshop \
  76. --hive-table ods_02_order_detail \
  77. --delete-target-dir \
  78. -m 1
  79. sqoop import \
  80. --connect jdbc:mysql://bigdata01:3306/nshop \
  81. --username root \
  82. --password 123456 \
  83. --query "select page_code,page_remark,page_type,page_target,page_ctime from page_dim where \$CONDITIONS" \
  84. --target-dir /data/nshop/ods/dim_pub_page \
  85. --hive-import \
  86. --hive-database ods_nshop \
  87. --hive-table dim_pub_page \
  88. --delete-target-dir \
  89. -m 1
  90. sqoop import \
  91. --connect jdbc:mysql://bigdata01:3306/nshop \
  92. --username root \
  93. --password 123456 \
  94. --query "select region_code,region_code_desc,region_city,region_city_desc,region_province,region_province_desc from area_dim where \$CONDITIONS" \
  95. --target-dir /data/nshop/ods/dim_pub_area \
  96. --hive-import \
  97. --hive-database ods_nshop \
  98. --hive-table dim_pub_area \
  99. --delete-target-dir \
  100. -m 1
  101. sqoop import \
  102. --connect jdbc:mysql://bigdata01:3306/nshop \
  103. --username root \
  104. --password 123456 \
  105. --query "select date_day,date_day_desc,date_day_month,date_day_year,date_day_en,date_week,date_week_desc,date_month,date_month_en,date_month_desc,date_quarter,date_quarter_en,date_quarter_desc,date_year from date_dim where \$CONDITIONS" \
  106. --target-dir /data/nshop/ods/dim_pub_date \
  107. --hive-import \
  108. --hive-database ods_nshop \
  109. --hive-table dim_pub_date \
  110. --delete-target-dir \
  111. -m 1
  112. sqoop import \
  113. --connect jdbc:mysql://bigdata01:3306/nshop \
  114. --username root \
  115. --password 123456 \
  116. --query "select pay_id,order_id,customer_id,pay_status,pay_type,pay_code,pay_nettype,pay_amount,pay_ctime from orders_pay_records where \$CONDITIONS" \
  117. --target-dir /data/nshop/ods/ods_02_orders_pay_records \
  118. --hive-import \
  119. --hive-database ods_nshop \
  120. --hive-table ods_02_orders_pay_records \
  121. --delete-target-dir \
  122. -m 1
  123. sqoop import \
  124. --connect jdbc:mysql://bigdata01:3306/nshop \
  125. --username root \
  126. --password 123456 \
  127. --query "select product_code,product_name,product_remark,category_code,supplier_code,product_price,product_weighing_cost,product_publish_status,product_audit_status,product_bar_code,product_weight,product_length,product_height,product_width,product_colors,product_date,product_shelf_life,product_ctime,product_utime from product where \$CONDITIONS" \
  128. --target-dir /data/nshop/ods/dim_pub_product \
  129. --hive-import \
  130. --hive-database ods_nshop \
  131. --hive-table dim_pub_product \
  132. --delete-target-dir \
  133. -m 1
  1. nodes:
  2. - name: ods
  3. type: command
  4. config:
  5. command: sh sqoopJob.sh
  1. azkaban-flow-version: 2.0

总共三个脚本,打包成zip,上传至azkaban即可。
192.168.32.100:8081 admin admin
image.png
以上这个步骤最重要的是注意字符集,修改为 utf-8 witout no-bom 格式。
image.png
image.png
修改sqoopJob.sh 中的格式为linux格式。
使用nodepad++ ,中的edit 中的 EOL-Convertion 选择为 Unix / OSX format ,保存,重新打包上传即可。
image.png
image.png

第二步:导入hdfs上的数据到hive中(埋点日志数据)

创建表(根据数据的格式)

  1. create external table if not exists ods_nshop.ods_nshop_01_useractlog(
  2. action string comment '行为类型:install安装|launch启动|interactive交互|page_enter_h5页面曝光|page_enter_native页面进入|exit退出',
  3. event_type string comment '行为类型:click点击|view浏览|slide滑动|input输入',
  4. customer_id string comment '用户id',
  5. device_num string comment '设备号',
  6. device_type string comment '设备类型',
  7. os string comment '手机系统',
  8. os_version string comment '手机系统版本',
  9. manufacturer string comment '手机制造商',
  10. carrier string comment '电信运营商',
  11. network_type string comment '网络类型',
  12. area_code string comment '地区编码',
  13. longitude string comment '经度',
  14. latitude string comment '纬度',
  15. extinfo string comment '扩展信息(json格式)',
  16. duration string comment '停留时长',
  17. ct bigint comment '创建时间'
  18. ) partitioned by (bdp_day string)
  19. ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
  20. STORED AS TEXTFILE
  21. location '/data/nshop/ods/user_action_log/'

因为日志中的数据是json数据,所以需要配置相关的内容:

  1. json格式数据表需要通过serde机制处理
  2. 在/usr/local/hive/conf 找到 hive-site.xml 修改里面的值,不是追加
  3. 1 hive-site.xml中设置三方jar
  4. <property>
  5. <name>hive.aux.jars.path</name>
  6. <value>/usr/local/hive/lib/</value>
  7. <description>The location of the plugin jars that contain implementations of user defined functions and serdes.</description>
  8. </property>
  9. 2 hive.aux.jars.path设置的路径中增加hive-hcatalog-core-3.1.2.jar
  10. linux中,将hive hcatalog 中的一个jar包复制到 hivelib
  11. cp $HIVE_HOME/hcatalog/share/hcatalog/hive-hcatalog-core-3.1.2.jar /usr/local/hive/lib/

如果之前没有配置过响应的配置,需要hive退出,再进入一次。
将日志数据加载到创建的表中(这个日志数据可以先提交到hdfs再加载,也可以在本地加载,真实的情况应该是在hdfs上加载。)
将本地的000000_0 这个日志数据放入到linux 本地的 /root/hive_data 下

  1. load data local inpath '/root/hivedata/000000_0' into table ods_nshop.ods_nshop_01_useractlog partition(bdp_day="20220509");

第三步:导入第三方数据到hive表中

创建这个表:

  1. create external table if not exists ods_nshop.ods_nshop_01_releasedatas(
  2. device_num string comment '设备号',
  3. device_type string comment '设备类型',
  4. os string comment '手机系统',
  5. os_version string comment '手机系统版本',
  6. manufacturer string comment '手机制造商',
  7. area_code string comment '地区编码',
  8. release_sid string comment '投放请求id',
  9. release_session string comment '投放会话id',
  10. release_sources string comment '投放渠道',
  11. release_params string comment '投放请求参数',
  12. ct bigint comment '创建时间'
  13. ) partitioned by (bdp_day string)
  14. ROW FORMAT DELIMITED
  15. FIELDS TERMINATED BY ','
  16. stored as textfile
  17. location '/data/nshop/ods/release/'

因为第三方数据是由广告方提供的,每天提供一些,通过接口传递过来的,我们可以每天生成一个文件夹,将数据存储到对应的文件夹下,比如/usr/local/flume/data 下。
可以从
image.png
将此数据上传至 /usr/local/flume/data下即可。
编写flume脚本,放在conf下
image.png

  1. # 命名
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = s1
  5. # 关联
  6. a1.sources.r1.channels = c1
  7. a1.sinks.s1.channel = c1
  8. # 配置source类型和属性
  9. a1.sources.r1.type = TAILDIR
  10. a1.sources.r1.filegroups = g1
  11. a1.sources.r1.filegroups.g1 = /usr/local/flume/data/.*.csv
  12. # 元数据保存位置
  13. a1.sources.r1.positionFile = /usr/local/flume/flume-log/taildir_position.json
  14. # 配置channel类型属性
  15. a1.channels.c1.type = memory
  16. # 缓存池大小
  17. a1.channels.c1.capacity = 1000
  18. # 每个事务sink拉取的大小
  19. a1.channels.c1.transactionCapacity = 100
  20. # 配置Sink类型和属性
  21. a1.sinks.s1.type = hdfs
  22. a1.sinks.s1.hdfs.path = hdfs://bigdata01:9820/data/nshop/ods/release/%Y%m%d
  23. a1.sinks.s1.hdfs.fileSuffix = .log
  24. # 下面三个配置参数如果都设置为0,那么表示不执行次参数(失效)
  25. a1.sinks.s1.hdfs.rollInterval = 10
  26. a1.sinks.s1.hdfs.rollSize = 0
  27. a1.sinks.s1.hdfs.rollCount = 0
  28. # 设置采集文件格式 如果你是存文本文件,就是用DataStream
  29. a1.sinks.s1.hdfs.fileType = DataStream
  30. a1.sinks.s1.hdfs.writeFormat = Text
  31. # 开启本地时间戳获取参数,因为我们的目录上面已经使用转义符号,所以要使用时间戳
  32. a1.sinks.s1.hdfs.useLocalTimeStamp = true

启动脚本进行采集,将数据从文件夹采集到hdfs上。

  1. flume-ng agent -n a1 -c ../conf -f agentData.conf -Dflume.root.logger=INFO,console

这个命令必须在哪里启动: /usr/local/flume/flumeconf 下启动的。
采集到了hdfs上,这个命令不会停止的,因为它监听的是一个文件夹,它要一直观察这个文件夹下的文件是否发送了变化。
image.png
导入hdfs数据到hive表中:

  1. load data inpath "/data/nshop/ods/release/20220503/*" into table ods_nshop.ods_nshop_01_releasedatas partition(bdp_day='20220509');