大数据的基本特征

Volume 海量 数据量大 根据IDC作出的估测, 数据一直都在以每年 50%的速度增长
Velocity 快速 处理速度快 从数据的生成到消耗,时间窗口非常小,可用于生 成决策的时间非常少
Variety 多样 数据类型繁多 大数据是由结构化和非结构化数据组成的
Value 价值 价值密度低,商业价值高
Veracity 准确 全面而非抽样

Hadoop 生态系统及每个部分的具体功能

image.png

组件 功能
HDFS 分布式文件系统
MapReduce 分布式并行编程模型
YARN 资源管理和调度器
Tez 运行在YARN之上的下一代Hadoop查询处理框架
Hive Hadoop上的数据仓库
HBase Hadoop上的非关系型的分布式数据库
Pig 一个基于Hadoop的大规模数据分析平台,提供类似SQL的查询语言Pig Latin
Sqoop 用于在Hadoop与传统数据库之间进行数据传递
Oozie Hadoop上的工作流管理系统
Zookeeper 提供分布式协调一致性服务
Storm 流计算框架
Flume 一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
Ambari Hadoop快速部署工具,支持Apache Hadoop集群的供应、管理和监控
Kafka 一种高吞吐量的分布式发布订阅消息系统,可以处理消费者规模的阅站中的所有动作流数据
Spark 类似于Hadoop MapReduce的通用并行框架

Hadoop集群搭建

配置文件:
hadoop-env.sh
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml

Linux常用命令

sudo mount -t vboxsf 文件名 /(共享文件夹位置) 挂载
tar -zxvf 文件 -C /(解压位置) 解压命令
ifconfig 查看ip
mkdir 创建目录
mkdir -p 创建多层目录
rm 删除文件
rmdir 删除空目录
vi.vim i插入 esc→: →q退出 wq保存退出 q!强制退出 wq!强制保存退出
cp 复制
vim ~/.bashrc 修改环境变量
source ~/.bashrc 环境变量生效
java -version 查看java版本
hadoop version 查看hadoop版本
chown -R 更改权限
hadoop namenode -format 名称节点格式化
./ + 文件(start-all.sh / stop-all.sh) 启动/关闭
jps 查看进程
hadoop fs -copyFromLocal /123.txt /in 上传文件到hdfs文件中

HDFS

抄吧

1.试述HDFS中的名称节点和数据节点的具体功能。

名称节点NameNode主要以元数据的形式进行管理和存储,用于维护文件系统名称并管理客户端对文件的访问;NameNode记录对文件系统名称空间或其属性的任何更改操作;一旦NameNode关闭,就无法访问Hadoop集群。
数据节点DataNode在客户端或者NameNode的调度下,存储并检索数据块,对数据块进行创建、删除等操作,并且定期向NameNode发送所存储的数据块列表。

2.在分布式文件系统中,中心节点的设计至关重要,请阐述HDFS是如何减轻中心节点的负担的。

在客户端需要访问一个文件时,名称节点并不参与数据的传输,而是只将数据节点位置发给客户端,因此实现了一个文件的数据能够在不同的数据节点上实现并发访问,大大提高了数据访问速度并减轻了中心服务器的负担,方便了数据管理。

3.试述HDFS的冗余数据保存策略。

HDFS采用了多副本方式对数据进行冗余存储,通常一个数据块的多个副本会被分布到不同的数据节点上。这种多副本方式具有以下几个优点:(1)加快数据传输速度(2)容易检查数据错误(3)保证数据可靠性

4.请阐述HDFS在不发生故障的情况下读文件的过程。

1) 客户端打开文件,创建输入流;
2) 输入流通过远程调用名称节点,获得文件开始部分数据块的保存位置;
3) 客户端得到位置后开始读取数据,输入流选择距离客户端最近的数据节点建立连接并读取数据;
4) 数据从该数据节点读取至客户端结束时,关闭连接;
5) 输入流查找下一个数据块;
6) 找到该数据块的最佳数据节点,读取数据;
7) 客户端读取完毕数据时,关闭输入流。

5.请阐述HDFS在不发生故障的情况下写文件的过程。

1) 客户端创建文件和输出流;
2) HDFS调用名称节点,在文件系统的命名空间中建一个新的文件,并执行检查;检查通过后,名称节点会构造一个新文件夹,并添加文件信息;
3) 客户端通过输出流向HDFS的文件写入数据;
4) 客户端写入的数据首先会被分成一个个的分包,将分包放入输出流对象的内部队列,并向名称节点申请若干个数据节点,然后通过流水线复制策略打包成数据包发送出去;
5) 为保证所有数据节点的数据都是准确的,需要数据节点向发送者发送“确认包”,当客户端收到应答时,将对应的分包从内部队列移除。不断执行3~5直至数据写完;
6) 客户端关闭输出流,通知名称节点关闭文件。

MapReduce

一个策略 分而治之 split->map=>reduce
一个架构 主从架构 master/slaves
一个理念 计算向数据靠拢

Map和Reduce的过程

MapReduce计算模型的核心是Map函数和Reduce函数,试述这两个函数各自的输入、输出以及处理过程。

Map函数的输入是来自于分布式文件系统的文件块,这些文件块的格式是任意的,可以是文档,也可以是二进制格式。文件块是一系列元素的集合,这些元素是任意类型的,同一个元素不能跨文件块存储。Map函数将输入的元素转换成形式的键值对,键和值的类型也是任意的,其中,键不同于一般的标志属性,即键没有唯一性,不能作为输出的身份标识,即使是同一输入元素,也可通过一个Map任务生成具有相同键的多个

Reduce 函数的任务就是将输入的一系列具有相同键的键值对以某种方式组合起来,输出处理后的键值对,输出结果会合并成一个文件。用户可以指定Reduce任务的个数(如n个),并通知实现系统,然后主控进程通常会选择一个Hash函数,Map任务输出的每个键都会经过Hash函数计算,并根据哈希结果将该键值对输入相应的Reduce任务来处理。对于处理键为k的Reduce任务的输入形式为>,输出为

试述MapReduce的工作流程(需包括提交任务、Map, Shuffle, Reduce的过程)


①MapReduce 框架使用InputFormat模块做Map前的预处理,比如,验证输入的格式是否符合输入定义;然后,将输入文件切分为逻辑上的多个InputSplit,InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit 并没有对文件进行实际切割,只是记录了要处理的数据的位置和长度。

②因为lnputSplit 是逻辑切分而非物理切分,所以,还需要通过RecordReader(RR根据InputSplit中的信息来处理InputSplit中的具体记录,加载数据并转换为适合Map任务读取的键值对,输入给Map任务。

③Map任务会根据用户自定义的映射规则,输出一系列的作为中间结果。

④为了让Reduce可以并行处理Map的结果,需要对Map的输出进行-一定的分区、排序(Sort)、合并(Combine)、归并(Merge)等操作,得到形式的中间结果,再交给对应的Reduce进行处理,这个过程称为Shuffle。

⑤Reduce 以一系列中间结果作为输入,执行用户定义的逻辑,输出结果给OutputFormat模块。

⑥OutputFormat 模块会验证输出目录是否已经存在以及输出结果类型是否符合配置文件中的配置类型,如果都满足,就输出Reduce的结果到分布式文件系统。


Shuffle过程是MapReduce工作流程的核心,也被称为奇迹发生的地方,.试分析Shuffle 过程的作用

了解


Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。Shuffle过程分为Map端的操作和Reduce端的操作。

①在Map端的Shuffle过程。Map的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件,并清空缓存。当启动溢写操作时,首先需要把缓存中的数据进行分区,然后对每个分区的数据进行排序(Sort和合并(Combine)之后再写入磁盘文件。每次溢写操作会生成-一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束之前,这些溢写文件会被归并(Merge)成-一个大的磁盘文件,然后,通知相应的Reduce任务来领取属于自己处理的数据。

②在Reduce端的Shuffle 过程。Reduce任务从Map端的不同Map机器领回属于自己处理的那部分数据,然后,对数据进行归并(Merge)后交给Reduce处理。

试画出使用 MapReduce来对英语句子“ Whatever is worth doing is worth doing well进行单词统计的过程。

重点

image.png

Hadoop的Java运行程序代码

非常重要!!!最后大题应该是其中的一个,如果不追求高分,可以不用写

MakeDir

  1. public class MakeDir {
  2. public static void main(String[] args)throws IOException,URISyntaxException{
  3. Configuration conf=new Configuration();
  4. String hdfsPath="hdfs://localhost:9000";
  5. FileSystem hdfs = FileSystem.get(new URI(hdfsPath),conf);
  6. String newDir="/";
  7. boolean result = hdfs.mkdirs(new Path(newDir));
  8. if(result){
  9. System.out.println("Success!");
  10. }else{
  11. System.out.println("Failed!");
  12. }
  13. }
  14. }

New_Witer

  1. public class New_Witer {
  2. public static void main(String[] args)throws IOException,URISyntaxException{
  3. Configuration conf=new Configuration();
  4. String hdfsPath="hdfs://localhost:9000";
  5. FileSystem hdfs = FileSystem.get(new URI(hdfsPath),conf);
  6. String filePath="/xm123/writefile";
  7. FSDataOutputStream create = hdfs.create(new Path(filePath));
  8. System.out.println("Step 1 Finish!");
  9. String sayHi="123小明";
  10. byte[] buff=sayHi.getBytes();
  11. create.write(buff,0,buff.length);
  12. create.close();
  13. System.out.println("Step 2 Finish!");
  14. }
  15. }

TouchFile

  1. public class TouchFile {
  2. public static void main(String[] args)throws IOException,URISyntaxException{
  3. Configuration conf=new Configuration();
  4. String hdfsPath="hdfs://localhost:9000";
  5. FileSystem hdfs = FileSystem.get(new URI(hdfsPath),conf);
  6. String filePath="/xm/123";
  7. FSDataOutputStream create = hdfs.create(new Path(filePath));
  8. System.out.println("Finish!");
  9. }
  10. }

UpLoad

  1. public class UpLoad {
  2. public static void main(String[] args)throws IOException,URISyntaxException{
  3. Configuration conf=new Configuration();
  4. String hdfsPath="hdfs://localhost:9000";
  5. FileSystem hdfs = FileSystem.get(new URI(hdfsPath),conf);
  6. String from_Linux="/123.txt";
  7. String to_HDFS="/xiaoming123/";
  8. hdfs.copyFromLocalFile(new Path(from_Linux),new Path(to_HDFS));
  9. System.out.println("Finish!");
  10. }
  11. }

wordcount

  1. public class wordcount {
  2. public static void main(String[] args) throws IOException,
  3. ClassNotFoundException, InterruptedException {
  4. Job job = Job.getInstance();
  5. job.setJobName("WordCount");
  6. job.setJarByClass(wordcount.class);
  7. job.setMapperClass(doMapper.class);
  8. job.setReducerClass(doReducer.class);
  9. job.setOutputKeyClass(Text.class);
  10. job.setOutputValueClass(IntWritable.class);
  11. Path in = new Path("hdfs://localhost:9000/ ");
  12. Path out = new Path("hdfs://localhost:9000/out");
  13. FileInputFormat.addInputPath(job, in);
  14. FileOutputFormat.setOutputPath(job, out);
  15. System.exit(job.waitForCompletion(true) ? 0 : 1);
  16. }
  17. public static class doMapper extends
  18. Mapper<Object, Text, Text, IntWritable> {
  19. public static final IntWritable one = new IntWritable(1);
  20. public static Text word = new Text();
  21. @Override
  22. protected void map(Object key, Text value, Context context)
  23. throws IOException, InterruptedException {
  24. StringTokenizer tokenizer = new StringTokenizer(value.toString()," ");
  25. word.set(tokenizer.nextToken());
  26. context.write(word, one);
  27. }
  28. }
  29. public static class doReducer extends
  30. Reducer<Text, IntWritable, Text, IntWritable> {
  31. private IntWritable result = new IntWritable();
  32. @Override
  33. protected void reduce(Text key, Iterable<IntWritable> values,
  34. Context context) throws IOException, InterruptedException {
  35. int sum = 0;
  36. for (IntWritable value : values) {
  37. sum += value.get();
  38. }
  39. result.set(sum);
  40. context.write(key, result);
  41. }
  42. }
  43. }

写在最后

有错误或者遗漏欢迎在评论区指出