[01].项目代码

01.ETLMapper

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. //LongWritable:偏移量(不需要)
  7. //Text:一行数据
  8. //NullWritable:不需要
  9. //Text:符合清洗规则的数据
  10. public class ETLMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
  11. @Override
  12. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context)
  13. throws IOException, InterruptedException {
  14. String data = value.toString();
  15. String fields[] = data.split("\t");
  16. //字段数目:3
  17. if(fields.length!=3) {
  18. System.out.println("[长度不符]:"+data);
  19. }
  20. //id长度:4
  21. if(fields[0].length()!=4) {
  22. System.out.println("[ID长度不符]");
  23. }
  24. //字段不是null
  25. for (String item : fields) {
  26. if(item.equals("null")) {
  27. System.out.println("[字段为空]");
  28. return ;
  29. }
  30. }
  31. context.write(NullWritable.get(), value);
  32. }
  33. }

02.ETLDriver

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. public class ETLDriver {
  10. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  11. System.out.println("[开始操作]");
  12. // 1.创建job对象
  13. Configuration conf = new Configuration();
  14. Job job = Job.getInstance(conf);
  15. // 2.设置jar路径
  16. job.setJarByClass(ETLDriver.class);
  17. // 3.关联map与red
  18. job.setMapperClass(ETLMapper.class);
  19. // 4.设置map输出的键值对类型
  20. job.setMapOutputKeyClass(NullWritable.class);
  21. job.setMapOutputValueClass(Text.class);
  22. // 5.设置最终数据输出键值对类型
  23. job.setOutputKeyClass(NullWritable.class);
  24. job.setOutputValueClass(Text.class);
  25. // 6.设置输入路径(FileInputFormat)和输出路径(FileOutputFormat)
  26. // FileInputFormat.setInputPaths(job, new Path(args[0]));19
  27. // FileOutputFormat.setOutputPath(job, new Path(args[1]));
  28. //本地测试
  29. FileInputFormat.setInputPaths(job, new Path("D:\\360MoveData\\Users\\AIGameJXB\\Desktop\\hive\\input\\student.txt"));
  30. //注意:output的目录必须不存在!
  31. FileOutputFormat.setOutputPath(job, new Path("D:\\360MoveData\\Users\\AIGameJXB\\Desktop\\hive\\output"));
  32. // 7.提交job
  33. boolean result = job.waitForCompletion(true);// true:打印运行信息
  34. System.out.println("[执行完毕]");
  35. System.exit(result ? 0 : 1);// 1:非正常退出
  36. }
  37. }
  • 和之前的wordcount项目一样,将其打包为jar文件,上传运行。

    [02].本地测试

  • 准备数据(为了方便放在桌面)

33.数据清洗案例 - 图1

  • 启动hadoop

33.数据清洗案例 - 图2
之前说过了启动方式,这里就只贴图。

  • 接下来直接在Eclipse运行测试

    [03].Hadoop测试

  • 准备资源:把txt和打好的jar包上传到虚拟机

33.数据清洗案例 - 图3

  • 上传到hadoop

    1. hadoop fs -mkdir /hiveinput
    1. hadoop fs -put ./student.txt /hiveinput
  • 切回到jar包的目录开始执行

    1. hadoop jar ./etl.jar /hiveinput/student.txt /hiveoutput

    33.数据清洗案例 - 图4

  • 接下来看一下生成的数据

33.数据清洗案例 - 图5

  1. hadoop fs -cat /hiveoutput/part-r-00000

33.数据清洗案例 - 图6