理论知识
属性名dfs.namenode.name.dir的是namenode的Fsimage文件路径
解析xml缺点:输出xml格式不太方便输出完整的文件路径,csv格式输出要简洁许多。
解析格式
XML
属性 | 类型 | 描述 | 例子 |
---|---|---|---|
id | bigint | ||
name | string | 文件名称 | |
type | string | 数据类型, FILE/DIRECTORY | |
blocks | struct |
blocks:包含多少数据块【文件被切成数据块】 block: 1. 内部的id表示是块id 1. genstamp是一个唯一编号 1. numBytes表示当前数据块的实际大小,fileSize = SUM(numBytes) |
{“block”:[{“genstamp”:1009,”id”:1073741833,”numBytes”:134217728},{“genstamp”:1010,”id”:1073741834,”numBytes”:100828269}]} |
preferredblocksize | bigint | 推荐每一个数据块的大小 | 134217728(128M) |
replication | bigint | 备份数 | |
atime | bigint | 最近一次访问时间 | 1645099637705 |
mtime | bigint | 最近一次修改时间 | 1645099637705 |
nsquota | bigint | type=DIRECTORY时有效,名称配额 限制指定目录下允许的文件和目录的数量 | -1 |
dsquota | bigint | type=DIRECTORY时有效,空间配额 限制该目录下允许的字节数 | -1 |
permission | string | 权限 | root:supergroup:0666 |
storagepolicyid | bigint | 访问策略 | |
xattrs | struct |
{“xattr”:{“name”:”hdfs.erasurecoding.policy”,”ns”:”SYSTEM”,”val”:”\\0000;\\0000;\\0000;\\000b;replication”}} |
CSV
属性 | 类型 | 描述 | 例子 |
---|---|---|---|
path | string | 文件完整路径 | |
replication | string | 备份数 | |
modificationtime | string | 最近一次修改时间 | |
accesstime | string | 最近一次访问时间 | |
preferredblocksize | string | 推荐每一个数据块的大小 | |
blockscount | string | 文件占用块数量 | = JsonArray.size(),xml的blocks中 |
filesize | string | 文件大小 | = SUM(numBytes),xml的blocks中 |
nsquota | string | 名称配额 限制指定目录下允许的文件和目录的数量 | |
dsquota | string | 空间配额 限制该目录下允许的字节数 | |
permission | string | -rwxrwxrwt | |
username | string | root | |
groupname | string | hive |
- 提取父目录、目录深度
- 增加cluster, partPath, partDay三个分区字段
- atime, atime后加:00 impala才能识别的日期格式
- 增加 fileSizeStep 来对文件大小进行分类
- 增加 mtimeStep 来对文件修改时间进行分类
统计维度
- 指定目录,查看该目录近2个月的文件数趋势、空间大小趋势
- 指定目录,查看该目录下空目录个数、空文件个数、replicator分布
- 指定目录,超过1年未访问过的文件数量(atime分布:1day, 7day, 14day, 30day, 180day, 1year, 2year, >2year)
- 指定目录,单文件大小分布: <1MB, <64MB, <128MB, <1G, <10G, <50G, >50G
- 指定目录下,哪个最底层目录中文件数量最多 top 10
- 指定目录下,哪个最底层目录中空间大小最多 top 10
- 指定目录下,哪个子目录文件数最多,哪些子目录空间占用最大
- 指定目录下,近7天空间增长最快的子目录
- 指定根目录/则表示集群级别
场景分析
- 数据量涨太快了,集群顶不住了,能不能删点冷数据?
EXCEL即可解决,select * from fsimage_hdfs3
昨天莫名的暴涨,可以看看是哪个业务团队的锅吗?
文件目录那么多,大小文件咋找出来啊?
小文件分为单独文件整个大小较小,占用一个Block如1M;另一种文件是切分后剩下的块较小,如129M,129%128=1M,第二个块较小;
XML方式可以统计上述两种情况;EXCEL只对外提供一个file_size解析,并没有文件组成的块信息详情
- 结合MetaBase可视化
操作
// 获取镜像文件并拷贝到tmp目录下
[root@cdh04-wuxi tmp]# hdfs dfsadmin -fetchImage /tmp
22/06/01 10:36:02 INFO namenode.TransferFsImage: Opening connection to http://cdh05-wuxi.cai-inc.com:9870/imagetransfer?getimage=1&txid=latest
22/06/01 10:36:02 INFO common.Util: Combined time for file download and fsync to all disks took 0.01s. The file download took 0.01s at 579600.00 KB/s. Synchronous (fsync) write to disk of /tmp/fsimage_0000000000104094979 took 0.00s.
// 使用 oiv 将image文件转成xml
[root@cdh04-wuxi tmp]# hdfs oiv -p XML -i /tmp/fsimage_0000000000104094979 -o fsimage.xml
22/06/01 10:43:24 INFO offlineImageViewer.FSImageHandler: Loading 12 strings
代码
将xml方式利用
package per.java.basic;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import per.java.basic.entity.BlockInfo;
import per.java.basic.entity.XmlFsImage;
import java.util.*;
import java.util.stream.Collectors;
public class ParseXmlJava {
/**
* @param args [0] 时间周期 单位秒
*/
public static void main(String[] args) throws AnalysisException {
// SparkSession spark = SparkSession
// .builder()
// .appName("Java Spark Hive Example")
// .master("local[*]")
// .enableHiveSupport()
// .getOrCreate();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.master("yarn")
.enableHiveSupport()
.getOrCreate();
// // 1。基本信息储存
Dataset<Row> df = spark.read().format("com.databricks.spark.xml")
.option("rowTag", "inode")
.option("nullValue", "")
// .load("/Users/apple/IdeaProjects/beisheng/scala-basic/src/main/scala/per/scala/basic/fsimage20220526.xml");
.load("/tmp/fsimage.xml");
List<BlockInfo> gBlockInfoList = new ArrayList<>();
Map<Long, XmlFsImage> map = new HashMap<>();
df.createTempView("tmp_table");
spark.sql("select atime, blocks.block, dsquota, id, mtime, name, nsquota, permission, preferredBlockSize, replication, storagePolicyId, type, xattrs from tmp_table")
.collectAsList()
.forEach(e -> {
XmlFsImage xmlFsImage = new XmlFsImage();
if (!e.isNullAt(0)) {
xmlFsImage.setAtime(e.getLong(0));
}
if (!e.isNullAt(1)) {
List<Object> list = e.getList(1);
List<BlockInfo> blockInfoList = new ArrayList<>();
Long fileSize = 0L;
for (Object elem : list) {
BlockInfo blockInfo = new BlockInfo();
blockInfo.setId(((GenericRowWithSchema) elem).getAs("id"));
blockInfo.setGenstamp(((GenericRowWithSchema) elem).getAs("genstamp"));
blockInfo.setNumBytes(((GenericRowWithSchema) elem).getAs("numBytes"));
if (!e.isNullAt(3)) {
blockInfo.setFileId(e.getLong(3));
}
blockInfoList.add(blockInfo);
fileSize += blockInfo.getNumBytes();
}
gBlockInfoList.addAll(blockInfoList);
xmlFsImage.setBlockNum(blockInfoList.size());
xmlFsImage.setFileSize(fileSize);
xmlFsImage.setBlocks(JSON.toJSONString(blockInfoList));
}
if (!e.isNullAt(2)) {
xmlFsImage.setDsquota(e.getLong(2));
}
if (!e.isNullAt(3)) {
xmlFsImage.setId(e.getLong(3));
}
if (!e.isNullAt(4)) {
xmlFsImage.setMtime(e.getLong(4));
}
if (!e.isNullAt(5)) {
xmlFsImage.setName(e.getString(5));
}
if (!e.isNullAt(6)) {
xmlFsImage.setNsquota(e.getLong(6));
}
if (!e.isNullAt(7)) {
xmlFsImage.setPermission(e.getString(7));
}
if (!e.isNullAt(8)) {
xmlFsImage.setPreferredBlockSize(e.getLong(8));
}
if (!e.isNullAt(9)) {
xmlFsImage.setReplication(e.getLong(9));
}
if (!e.isNullAt(10)) {
xmlFsImage.setStoragePolicyId(e.getLong(10));
}
if (!e.isNullAt(11)) {
xmlFsImage.setType(e.getString(11));
}
if (!e.isNullAt(12)) {
xmlFsImage.setXattrs(JSON.toJSONString(e.getStruct(12)));
}
map.put(xmlFsImage.getId(), xmlFsImage);
});
Dataset<Row> df2 = spark.read().format("com.databricks.spark.xml")
.option("rowTag", "directory")
.option("nullValue", "")
// .load("/Users/apple/IdeaProjects/beisheng/scala-basic/src/main/scala/per/scala/basic/fsimage20220526.xml");//hdfs://192.169.44.23:9000/tmp/fsimage.xml
.load("/tmp/fsimage.xml"); // hdfs://192.169.44.23:9000
df2.createTempView("table1");
List<Row> rows = spark.sql("select parent, explode(child) as child from table1").collectAsList();
Map<Long, TreeNode> treeNodeMap = new HashMap<>();
for (Row row : rows) {
Long pId = row.getLong(0);
Long cId = row.getLong(1);
TreeNode node = treeNodeMap.computeIfAbsent(pId, e -> new TreeNode(pId, map.get(pId)));
TreeNode cNode = treeNodeMap.computeIfAbsent(cId, e -> new TreeNode(cId, map.get(cId)));
cNode.setParent(node);
node.getChildren().add(cNode);
}
List<TreeNode> topTrees = treeNodeMap.values().stream().filter(e -> e.getParent() == null).collect(Collectors.toList());
System.out.println(topTrees);
for (TreeNode node: topTrees) {
node.getXmlFsImage().setPath("/");
node.getXmlFsImage().setPathLevel(1);
recurseTree(node);
}
List<XmlFsImage> list = new ArrayList<>(map.values());
Dataset<Row> dataFrame = spark.createDataFrame(list, XmlFsImage.class);
dataFrame.show();
dataFrame
.write()
.mode("overwrite")
.saveAsTable("tmp.fsimage_hdfs_file");
spark.createDataFrame(gBlockInfoList, BlockInfo.class)
.write()
.mode("overwrite")
.saveAsTable("tmp.fsimage_hdfs_file_block");
}
/**
* 递归封装 path和level
* @param node
*/
private static void recurseTree(TreeNode node) {
if (CollectionUtil.isEmpty(node.getChildren())) {
return;
}
XmlFsImage nodeXmlFsImage = node.getXmlFsImage();
for (TreeNode child : node.getChildren()) {
String name = child.getXmlFsImage().getName();
if (nodeXmlFsImage.getPathLevel() == 1) {
child.getXmlFsImage().setPath("/" + name);
} else {
child.getXmlFsImage().setPath(nodeXmlFsImage.getPath() + "/" + name);
}
child.getXmlFsImage().setPathLevel(nodeXmlFsImage.getPathLevel() + 1);
recurseTree(child);
}
}
}
Refer
https://edersoncorbari.github.io/tutorials/scala-spark-fsimage/
https://blog.csdn.net/Tybyqi/article/details/86716681