大数据技术之Hadoop(HDFS)

1. HDFS 概述

1.1 HDFS 产出背景及定义

1)HDFS 产生背景

  • 随着数据量越来越大,在一个操作系统存不下所有的数据,那么就分配到更多的操作系 统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这 就是分布式文件管理系统。HDFS 只是分布式文件管理系统中的一种。

2)HDFS 定义

  • HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目 录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务 器有各自的角色。
  • HDFS 的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭 之后就不需要改变。

1.2 HDFS 优缺点

HDFS优点

  • 高容错
    • 数据自动保存多个副本。它通过增加副本的形式,提高容错性。
    • 某一个副本丢失以后,它可以自动恢复。
  • 适合处理大数据
    • 数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据;
    • 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
  • 可构建在廉价机器上,通过多副本机制,提高可靠性

HDFS缺点

  • 不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。
  • 无法高效的对大量小文件进行存储。
    • 存储大量小文件的话,它会占用NameNode大量的内存来存储文件目录和 块信息。这样是不可取的,因为NameNode的内存总是有限的;
    • 小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标。
  • 不支持并发写入、文件随机修改。
    • 一个文件只能有一个写,不允许多个线程同时写;
    • 仅支持数据append(追加),不支持文件的随机修改。

1.3 HDFS 组成架构

1)NameNode(nn):就是Master,它 是一个主管、管理者。

  • 管理HDFS的名称空间;
  • 配置副本策略;
  • 管理数据块(Block)映射信息;
  • 处理客户端读写请求

2)DataNode:就是Slave。NameNode 下达命令,DataNode执行实际的操作。

  • 存储实际的数据块;
  • 执行数据块的读/写操作

3)Client:就是客户端。

  • 文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传;
  • 与NameNode交互,获取文件的位置信息;
  • 与DataNode交互,读取或者写入数据;
  • Client提供一些命令来管理HDFS,比如NameNode格式化;
  • Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作;

4)Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,并不 能马上替换NameNode并提供服务。

  • 辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode ;
  • 在紧急情况下,可辅助恢复NameNode。

image-20220128125340342

1.4 HDFS 文件块大小(面试重点)

HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数 ( dfs.blocksize)来规定,默认大小在Hadoop2.x/3.x版本中是128M,1.x版本中是64M。

image-20220128130009008

思考:为什么块的大小不能设置太小,也不能设置太大?

  • HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置;
  • 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开 始位置所需的时间。导致程序在处理这块数据时,会非常慢。 总结:HDFS块的大小设置主要取决于磁盘传输速率。

2. HDFS 的 Shell 操作(开发重点)

  • shadoop fs 具体命令 OR hdfs dfs 具体命令 两个是完全相同的。

命令大全

image-20220128130221201

2.1 常用命令实操

2.1.1 准备工作

  • 启动 Hadoop 集群(方便后续的测试)

[root@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh

[root@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

  • -help:输出这个命令参数

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -help rm

  • 3)创建/sanguo 文件夹

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir /sanguo

2.1.2 上传

  • -moveFromLocal:从本地剪切粘贴到HDFS

[root@hadoop102 hadoop-3.1.3]$ cat >> shuguo.txt<<EOF
shuguo
EOF

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -moveFromLocal ./shuguo.txt /sanguo

  • -copyFromLocal:从本地文件系统中拷贝文件到HDFS路径去

[root@hadoop102 hadoop-3.1.3]$ cat >> weiguo.txt<<EOF
weiguo
EOF

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -copyFromLocal weiguo.txt /sanguo

  • -put:等同于copyFromLocal,生产环境更习惯用put

[root@hadoop102 hadoop-3.1.3]$ cat >> wuguo.txt<<EOF
wuguo
EOF

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -put ./wuguo.txt /sanguo

  • -appendToFile:追加一个文件到已经存在的文件末尾

[root@hadoop102 hadoop-3.1.3]$ cat >> liubei.txt<<EOF
liubei
EOF

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt

2.1.3 下载

  • -copyToLocal:从 HDFS 拷贝到本地

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -copyToLocal /sanguo/shuguo.txt ./

  • -get:等同于 copyToLocal,生产环境更习惯用 get

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -get /sanguo/shuguo.txt ./shuguo2.txt

2.1.4 HDFS 直接操作

  • -ls: 显示目录信息

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -ls /sanguo

  • -cat:显示文件内容

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -cat /sanguo/shuguo.txt

  • -chgrp、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -chmod 666 /sanguo/shuguo.txt

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -chown zzxx:zzxx /sanguo/shuguo.txt

  • -mkdir:创建路径

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -mkdir /jinguo

  • -cp:从HDFS的一个路径拷贝到HDFS的另一个路径

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -cp /sanguo/shuguo.txt /jinguo

  • -mv:在HDFS目录中移动文件

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /sanguo/wuguo.txt /jinguo

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /sanguo/weiguo.txt /jinguo

  • -tail:显示一个文件的末尾1kb的数据

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -tail /jinguo/shuguo.txt

  • -rm:删除文件或文件夹

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -rm /sanguo/shuguo.txt

  • -rm -r:递归删除目录及目录里面内容

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -rm -r /sanguo

  • -du统计文件夹的大小信息

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -du -h /jinguo

14 42 /jinguo/shuguo.txt

7 21 /jinguo/weiguo.txt

6 18 /jinguo/wuguo.tx

​ 说明:27表示文件大小;81表示27*3个副本;/jinguo表示查看的目录

  • -setrep:设置 HDFS 中文件的副本数量

[root@hadoop102 hadoop-3.1.3]$ hadoop fs -setrep 10 /jinguo/shuguo.txt

image-20220128134834578

这里设置的副本数只是记录在 NameNode 的元数据中,是否真的会有这么多副本,还得 看 DataNode 的数量。因为目前只有 3 台设备,最多也就 3 个副本,只有节点数的增加到 10 台时,副本数才能达到 10。

3. HDFS的API操作

3.1客户端环境准备

找到资料包路径下的 Windows 依赖文件夹,拷贝 hadoop-3.1.0 到非中文路径(比如 d:\)。

配置 HADOOP_HOME 环境变量

image-20220128140548690

  • 在 IDEA 中创建一个 Maven 工程 HdfsClientDemo,并导入相应的依赖坐标+日志添加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
  • 在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • 创建包名:com.zzxx.hdfs
  • 创建 HdfsClient 类
1
2
3
4
5
6
7
8
9
10
@Test
public void testMkdirs() throws IOException, URISyntaxException, InterruptedException {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration);
// 2 创建目录
fs.mkdirs(new Path("/xiyou/huaguoshan/"));
// 3 关闭资源
fs.close();
}

image-20220128163728496

  • 客户端去操作 HDFS 时,是有一个用户身份的。默认情况下,HDFS 客户端 API 会从采 用 Windows 默认用户访问 HDFS,会报权限异常错误。所以在访问 HDFS 时,一定要配置 用户。
1
2
3
4
5
6
7
8
9
10
11
@Test
public void testMkdirs() throws IOException, URISyntaxException, InterruptedException {
// 1 获取文件系统
Configuration configuration = new Configuration();
// FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration);
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration,"root");
// 2 创建目录
fs.mkdirs(new Path("/xiyou/huaguoshan/"));
// 3 关闭资源
fs.close();
}

image-20220128164107767

3.2 HDFS 的 API 案例实操

3.2.1 HDFS 文件上传(测试参数优先级)

  • 1)编写源代码
1
2
3
4
5
6
7
8
9
10
11
@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {
// 1 获取文件系统
Configuration configuration = new Configuration();
configuration.set("dfs.replication", "2");
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "root");
// 2 上传文件
fs.copyFromLocalFile(new Path("d:/sunwukong.txt"), new Path("/xiyou/huaguoshan"));
// 3 关闭资源
fs.close();
}
  • 2)将 hdfs-site.xml 拷贝到项目的 resources 资源目录下
1
2
3
4
5
6
7
8
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
  • 结果为两个副本

image-20220128165712927

  • 3)参数优先级 参数优先级排序:
    • 客户端代码中设置的值
    • ClassPath 下的用户自定义配置文 件
    • 然后是服务器的自定义配置(xxx-site.xml)
    • 服务器的默认配置(xxx-default.xml)

3.2.2 HDFS 文件下载

  • 1)编写源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testCopyToLocalFile() throws IOException,
InterruptedException, URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"),
configuration, "root");
// 2 执行下载操作
// boolean delSrc 指是否将原文件删除
// Path src 指要下载的文件路径
// Path dst 指将文件下载到的路径
// boolean useRawLocalFileSystem 是否开启文件校验
fs.copyToLocalFile(false, new Path("/xiyou/huaguoshan/sunwukong.txt"), new Path("d:/sunwukong2.txt"),
true);
// 3 关闭资源
fs.close();
}
  • 注意:如果执行上面代码,下载不了文件,有可能是你电脑的微软支持的运行库少,需 要安装一下微软运行库。

3.2.3 HDFS 文件更名和移动

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testRename() throws IOException, InterruptedException,
URISyntaxException {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"),
configuration, "root");
// 2 修改文件名称
fs.rename(new Path("/xiyou/huaguoshan/sunwukong.txt"), new
Path("/xiyou/huaguoshan/meihouwang.txt"));
// 3 关闭资源
fs.close();
}

3.2.4 HDFS 删除文件和目录

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testDelete() throws IOException, InterruptedException,
URISyntaxException{
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"),
configuration, "root");
// 2 执行删除
fs.delete(new Path("/xiyou"), true);
// 3 关闭资源
fs.close();
}

3.2.5 HDFS 文件详情查看

  • 查看文件名称、权限、长度、块信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void testListFiles() throws IOException, InterruptedException,
URISyntaxException {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"),
configuration, "root");
// 2 获取文件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"),
true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("========" + fileStatus.getPath() + "=========");
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getOwner());
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getLen());
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getReplication());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPath().getName());
// 获取块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
// 3 关闭资源
fs.close();
}

3.2.6 HDFS 文件和文件夹判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testListStatus() throws IOException, InterruptedException,
URISyntaxException{
// 1 获取文件配置信息
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"),
configuration, "root");
// 2 判断是文件还是文件夹
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
// 如果是文件
if (fileStatus.isFile()) {
System.out.println("f:"+fileStatus.getPath().getName());
}else {
System.out.println("d:"+fileStatus.getPath().getName());
}
}
// 3 关闭资源
fs.close();
}

4. HDFS 的读写流程(面试重点)

4.1 HDFS 写数据流程

4.1.1 剖析文件写入

image-20220128171003637

  • (1)客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。
  • (2)NameNode 返回是否可以上传。
  • (3)客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。
  • (4)NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。
  • (5)客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。
  • (6)dn1、dn2、dn3 逐级应答客户端。
  • (7)客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存), 以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答。
  • (8)当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务 器。(重复执行 3-7 步)。

4.1.2 网络拓扑-节点距离计算

在 HDFS 写数据的过程中,NameNode 会选择距离待上传数据最近距离的 DataNode 接 收数据。那么这个最近距离怎么计算呢?

  • 节点距离:两个节点到达最近的共同祖先的距离总和。

image-20220128171330816

4.1.3 机架感知(副本存储节点选择)

  • 第一个副本在Client所处的节点上。 如果客户端在集群外,随机选一个。
  • 第二个副本在另一个机架的随机 一个节点
  • 第三个副本在第二个副本所在机架的 随机节点

image-20220128204402409

查看对应源码

  • Crtl + n 查找 BlockPlacementPolicyDefault,在该类中查找 chooseTargetInOrder 方法。

image-20220128204317140

4.2 HDFS 读数据流程

image-20220128171418523

  • (1)客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查 询元数据,找到文件块所在的 DataNode 地址。
  • (2)挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
  • (3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位 来做校验)。
  • (4)客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。

5. NameNode 和 SecondaryNameNode

5.1 NN 和 2NN 工作机制

  • 思考:NameNode 中的元数据是存储在哪里的?

​ 首先,我们做个假设,如果存储在 NameNode 节点的磁盘中,因为经常需要进行随机访 问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在 内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的 FsImage。

​ 这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新 FsImage,就会导 致效率过低,但如果不更新,就会发生一致性问题,一旦 NameNode 节点断电,就会产生数 据丢失。因此,引入 Edits 文件(只进行追加操作,效率很高)。每当元数据有更新或者添 加元数据时,修改内存中的元数据并追加到 Edits 中。这样,一旦 NameNode 节点断电,可 以通过 FsImage 和 Edits 的合并,合成元数据。

​ 但是,如果长时间添加数据到 Edits 中,会导致该文件数据过大,效率降低,而且一旦 断电,恢复元数据需要的时间过长。因此,需要定期进行 FsImage 和 Edits 的合并,如果这 个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode, 专门用于 FsImage 和 Edits 的合并。

image-20220128171647896

1)第一阶段:NameNode 启动

  • (1)第一次启动 NameNode 格式化后,创建 Fsimage 和 Edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
  • (2)客户端对元数据进行增删改的请求。
  • (3)NameNode 记录操作日志,更新滚动日志。
  • (4)NameNode 在内存中对元数据进行增删改。

2)第二阶段:Secondary NameNode 工作

  • (1)Secondary NameNode 询问 NameNode 是否需要 CheckPoint。直接带回 NameNode 是否检查结果。
  • (2)Secondary NameNode 请求执行 CheckPoint。
  • (3)NameNode 滚动正在写的 Edits 日志。
  • (4)将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。
  • (5)Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。
  • (6)生成新的镜像文件 fsimage.chkpoint。
  • (7)拷贝 fsimage.chkpoint 到 NameNode。
  • (8)NameNode 将 fsimage.chkpoint 重新命名成 fsimage。

5.2 Fsimage 和 Edits 解析

第 6 章 DataNode

6.1 DataNode 工作机制

image-20220128210215430

  • (1)一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据 本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
  • (2)DataNode 启动后向 NameNode 注册,通过后,周期性(6 小时)的向 NameNode 上 报所有的块信息。
  • DN 向 NN 汇报当前解读信息的时间间隔,默认 6 小时;
1
2
3
4
5
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>21600000</value>
<description>Determines block reporting interval in milliseconds.</description>
</property>
  • DN 扫描自己节点块信息列表的时间,默认 6 小时
1
2
3
4
5
6
<property>
<name>dfs.datanode.directoryscan.interval</name>
<value>21600s</value>
<description>Interval in seconds for Datanode to scan datadirectories and reconcile the difference between blocks in memory and on the disk.Support multiple time unit suffix(case insensitive), as described in dfs.heartbeat.interval.
</description>
</property>
  • (3)心跳是每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块 数据到另一台机器,或删除某个数据块。如果超过 10 分钟没有收到某个 DataNode 的心跳, 则认为该节点不可用。

  • (4)集群运行中可以安全加入和退出一些机器。

6.2 数据完整性

思考:如果电脑磁盘里面存储的数据是控制高铁信号灯的红灯信号(1)和绿灯信号(0), 但是存储该数据的磁盘坏了,一直显示是绿灯,是否很危险?同理 DataNode 节点上的数据 损坏了,却没有发现,是否也很危险,那么如何解决呢?

如下是 DataNode 节点保证数据完整性的方法。

  • (1)当 DataNode 读取 Block 的时候,它会计算 CheckSum。
  • (2)如果计算后的 CheckSum,与 Block 创建时值不一样,说明 Block 已经损坏。
  • (3)Client 读取其他 DataNode 上的 Block。
  • (4)常见的校验算法 crc(32),md5(128),sha1(160)
  • (5)DataNode 在其文件创建后周期验证 CheckSum。

image-20220128210538813

6.3 掉线时限参数设置