理论知识

属性名dfs.namenode.name.dir的是namenode的Fsimage文件路径
解析xml缺点:输出xml格式不太方便输出完整的文件路径,csv格式输出要简洁许多。

解析格式

XML

属性 类型 描述 例子
id bigint

name string 文件名称
type string 数据类型, FILE/DIRECTORY
blocks struct>> blocks:包含多少数据块【文件被切成数据块】
block:
1. 内部的id表示是块id
1. genstamp是一个唯一编号
1. numBytes表示当前数据块的实际大小,fileSize = SUM(numBytes)
{“block”:[{“genstamp”:1009,”id”:1073741833,”numBytes”:134217728},{“genstamp”:1010,”id”:1073741834,”numBytes”:100828269}]}
preferredblocksize bigint 推荐每一个数据块的大小 134217728(128M)
replication bigint 备份数
atime bigint 最近一次访问时间 1645099637705
mtime bigint 最近一次修改时间 1645099637705
nsquota bigint type=DIRECTORY时有效,名称配额 限制指定目录下允许的文件和目录的数量 -1
dsquota bigint type=DIRECTORY时有效,空间配额 限制该目录下允许的字节数 -1
permission string 权限 root:supergroup:0666
storagepolicyid bigint 访问策略
xattrs struct>
{“xattr”:{“name”:”hdfs.erasurecoding.policy”,”ns”:”SYSTEM”,”val”:”\\0000;\\0000;\\0000;\\000b;replication”}}

CSV

属性 类型 描述 例子
path string 文件完整路径
replication string 备份数
modificationtime string 最近一次修改时间
accesstime string 最近一次访问时间
preferredblocksize string 推荐每一个数据块的大小
blockscount string 文件占用块数量 = JsonArray.size(),xml的blocks中
filesize string 文件大小 = SUM(numBytes),xml的blocks中
nsquota string 名称配额 限制指定目录下允许的文件和目录的数量
dsquota string 空间配额 限制该目录下允许的字节数
permission string
-rwxrwxrwt
username string
root
groupname string
hive
  1. 提取父目录、目录深度
  2. 增加cluster, partPath, partDay三个分区字段
  3. atime, atime后加:00 impala才能识别的日期格式
  4. 增加 fileSizeStep 来对文件大小进行分类
  5. 增加 mtimeStep 来对文件修改时间进行分类

统计维度

  • 指定目录,查看该目录近2个月的文件数趋势、空间大小趋势
  • 指定目录,查看该目录下空目录个数、空文件个数、replicator分布
  • 指定目录,超过1年未访问过的文件数量(atime分布:1day, 7day, 14day, 30day, 180day, 1year, 2year, >2year)
  • 指定目录,单文件大小分布: <1MB, <64MB, <128MB, <1G, <10G, <50G, >50G
  • 指定目录下,哪个最底层目录中文件数量最多 top 10
  • 指定目录下,哪个最底层目录中空间大小最多 top 10
  • 指定目录下,哪个子目录文件数最多,哪些子目录空间占用最大
  • 指定目录下,近7天空间增长最快的子目录
  • 指定根目录/则表示集群级别

场景分析

  1. 数据量涨太快了,集群顶不住了,能不能删点冷数据?

EXCEL即可解决,select * from fsimage_hdfs3
image.png

  1. 昨天莫名的暴涨,可以看看是哪个业务团队的锅吗?

  2. 文件目录那么多,大小文件咋找出来啊?

小文件分为单独文件整个大小较小,占用一个Block如1M;另一种文件是切分后剩下的块较小,如129M,129%128=1M,第二个块较小;
XML方式可以统计上述两种情况;EXCEL只对外提供一个file_size解析,并没有文件组成的块信息详情

  1. 结合MetaBase可视化

操作

  1. // 获取镜像文件并拷贝到tmp目录下
  2. [root@cdh04-wuxi tmp]# hdfs dfsadmin -fetchImage /tmp
  3. 22/06/01 10:36:02 INFO namenode.TransferFsImage: Opening connection to http://cdh05-wuxi.cai-inc.com:9870/imagetransfer?getimage=1&txid=latest
  4. 22/06/01 10:36:02 INFO common.Util: Combined time for file download and fsync to all disks took 0.01s. The file download took 0.01s at 579600.00 KB/s. Synchronous (fsync) write to disk of /tmp/fsimage_0000000000104094979 took 0.00s.
  5. // 使用 oiv 将image文件转成xml
  6. [root@cdh04-wuxi tmp]# hdfs oiv -p XML -i /tmp/fsimage_0000000000104094979 -o fsimage.xml
  7. 22/06/01 10:43:24 INFO offlineImageViewer.FSImageHandler: Loading 12 strings

代码

将xml方式利用做path拼接,将blocks字段拆分出来,抽取出block粒度的记录

package per.java.basic;

import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import per.java.basic.entity.BlockInfo;
import per.java.basic.entity.XmlFsImage;

import java.util.*;
import java.util.stream.Collectors;

public class ParseXmlJava {

    /**
    * @param args [0] 时间周期 单位秒
    */
    public static void main(String[] args) throws AnalysisException {
        //        SparkSession spark = SparkSession
        //                .builder()
        //                .appName("Java Spark Hive Example")
        //                .master("local[*]")
        //                .enableHiveSupport()
        //                .getOrCreate();

        SparkSession spark = SparkSession
            .builder()
            .appName("Java Spark Hive Example")
            .master("yarn")
            .enableHiveSupport()
            .getOrCreate();

        //    // 1。基本信息储存
        Dataset<Row> df = spark.read().format("com.databricks.spark.xml")
            .option("rowTag", "inode")
            .option("nullValue", "")
            //                .load("/Users/apple/IdeaProjects/beisheng/scala-basic/src/main/scala/per/scala/basic/fsimage20220526.xml");
            .load("/tmp/fsimage.xml");

        List<BlockInfo> gBlockInfoList = new ArrayList<>();
        Map<Long, XmlFsImage> map = new HashMap<>();
        df.createTempView("tmp_table");
        spark.sql("select atime, blocks.block, dsquota, id, mtime, name, nsquota, permission, preferredBlockSize, replication, storagePolicyId, type, xattrs from tmp_table")
            .collectAsList()
            .forEach(e -> {
                XmlFsImage xmlFsImage = new XmlFsImage();
                if (!e.isNullAt(0)) {
                    xmlFsImage.setAtime(e.getLong(0));
                }
                if (!e.isNullAt(1)) {
                    List<Object> list = e.getList(1);

                    List<BlockInfo> blockInfoList = new ArrayList<>();
                    Long fileSize = 0L;
                    for (Object elem : list) {
                        BlockInfo blockInfo = new BlockInfo();
                        blockInfo.setId(((GenericRowWithSchema) elem).getAs("id"));
                        blockInfo.setGenstamp(((GenericRowWithSchema) elem).getAs("genstamp"));
                        blockInfo.setNumBytes(((GenericRowWithSchema) elem).getAs("numBytes"));
                        if (!e.isNullAt(3)) {
                            blockInfo.setFileId(e.getLong(3));
                        }
                        blockInfoList.add(blockInfo);

                        fileSize += blockInfo.getNumBytes();
                    }
                    gBlockInfoList.addAll(blockInfoList);
                    xmlFsImage.setBlockNum(blockInfoList.size());
                    xmlFsImage.setFileSize(fileSize);
                    xmlFsImage.setBlocks(JSON.toJSONString(blockInfoList));
                }
                if (!e.isNullAt(2)) {
                    xmlFsImage.setDsquota(e.getLong(2));
                }
                if (!e.isNullAt(3)) {
                    xmlFsImage.setId(e.getLong(3));
                }
                if (!e.isNullAt(4)) {
                    xmlFsImage.setMtime(e.getLong(4));
                }
                if (!e.isNullAt(5)) {
                    xmlFsImage.setName(e.getString(5));
                }
                if (!e.isNullAt(6)) {
                    xmlFsImage.setNsquota(e.getLong(6));
                }
                if (!e.isNullAt(7)) {
                    xmlFsImage.setPermission(e.getString(7));
                }
                if (!e.isNullAt(8)) {
                    xmlFsImage.setPreferredBlockSize(e.getLong(8));
                }
                if (!e.isNullAt(9)) {
                    xmlFsImage.setReplication(e.getLong(9));
                }
                if (!e.isNullAt(10)) {
                    xmlFsImage.setStoragePolicyId(e.getLong(10));
                }
                if (!e.isNullAt(11)) {
                    xmlFsImage.setType(e.getString(11));
                }
                if (!e.isNullAt(12)) {
                    xmlFsImage.setXattrs(JSON.toJSONString(e.getStruct(12)));
                }

                map.put(xmlFsImage.getId(), xmlFsImage);
            });

        Dataset<Row> df2 = spark.read().format("com.databricks.spark.xml")
            .option("rowTag", "directory")
            .option("nullValue", "")
            //                .load("/Users/apple/IdeaProjects/beisheng/scala-basic/src/main/scala/per/scala/basic/fsimage20220526.xml");//hdfs://192.169.44.23:9000/tmp/fsimage.xml
            .load("/tmp/fsimage.xml"); // hdfs://192.169.44.23:9000
        df2.createTempView("table1");

        List<Row> rows = spark.sql("select parent, explode(child) as child from table1").collectAsList();

        Map<Long, TreeNode> treeNodeMap = new HashMap<>();
        for (Row row : rows) {
            Long pId = row.getLong(0);
            Long cId = row.getLong(1);

            TreeNode node = treeNodeMap.computeIfAbsent(pId, e -> new TreeNode(pId, map.get(pId)));
            TreeNode cNode = treeNodeMap.computeIfAbsent(cId, e -> new TreeNode(cId, map.get(cId)));
            cNode.setParent(node);
            node.getChildren().add(cNode);
        }

        List<TreeNode> topTrees = treeNodeMap.values().stream().filter(e -> e.getParent() == null).collect(Collectors.toList());
        System.out.println(topTrees);

        for (TreeNode node: topTrees) {
            node.getXmlFsImage().setPath("/");
            node.getXmlFsImage().setPathLevel(1);
            recurseTree(node);
        }

        List<XmlFsImage> list = new ArrayList<>(map.values());
        Dataset<Row> dataFrame = spark.createDataFrame(list, XmlFsImage.class);
        dataFrame.show();
        dataFrame
            .write()
            .mode("overwrite")
            .saveAsTable("tmp.fsimage_hdfs_file");

        spark.createDataFrame(gBlockInfoList, BlockInfo.class)
            .write()
            .mode("overwrite")
            .saveAsTable("tmp.fsimage_hdfs_file_block");
    }

    /**
    * 递归封装 path和level
    * @param node
    */
    private static void recurseTree(TreeNode node) {
        if (CollectionUtil.isEmpty(node.getChildren())) {
            return;
        }

        XmlFsImage nodeXmlFsImage = node.getXmlFsImage();
        for (TreeNode child : node.getChildren()) {
            String name = child.getXmlFsImage().getName();

            if (nodeXmlFsImage.getPathLevel() == 1) {
                child.getXmlFsImage().setPath("/" + name);
            } else {
                child.getXmlFsImage().setPath(nodeXmlFsImage.getPath() + "/" + name);
            }
            child.getXmlFsImage().setPathLevel(nodeXmlFsImage.getPathLevel() + 1);
            recurseTree(child);
        }
    }
}

Refer

https://edersoncorbari.github.io/tutorials/scala-spark-fsimage/
https://blog.csdn.net/Tybyqi/article/details/86716681