orc小文件合并趣谈

前言

这周做了个事情趁热沉淀一下。问题很明确治理小文件。问题由来,要追溯到去年,集群治理了。之前做到存储计算的管理,后续做了简单hdfs画像(其中,就有小文件趋势监控)。最近,集群中namenode压力有所显现。于是,针对小文件多的目录进行了排查和治理。进而,有了今天的这个主题ORC小文件合并趣谈

核心问题

这里,首先治理的是实时导入数据的目录。这里增量数据采用SparkSQL以动态分区增量写入的方式。众所周知,spark在处理时,每个task都会写入一个文件(如果task处理的数据,包含n个分区的数据,就会产生n个文件)。进而,在并行度高的情况下,导致对应增量分区文件很多(存储并不大)。

存储治理中,平台统一要求将hive表的格式向orc格式靠拢。orc的表在存储和查询上都有很好的提升。所以,这个问题就间接的转化为解决orc小文件问题

解决问题

解决问题,就先从根源入手,即sparksql小文件产生源头。在spark 2.4 版本中提供了hit的方式(https://issues.apache.org/jira/browse/SPARK-24940)。

处理上,采用程序升级和定时合并的方式。本文,主要介绍如何定时合并orc文件。

措施

方案对比

经过分析,总结了两种方式。

  • 使用ORC原生DDL方式合并小文件功能。

    ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE;

附:hive-ddl

优点:

  1. 原生支持,开发量小。
  2. 避免了数据的解压、解码过程。

缺点:

  1. 不够优雅,无法指定最终合并的文件数,需要多次执行。
  2. 产生一个hivesql处理,中间过程分区目录会产生.staging-hive*的文件。
  3. 比较耗时。
  • 重新造轮子,实现文件合并功能。

优点:

  1. 省时间,直接操作hdfs,省去了hive处理过程。
  2. 可以控制最终文件数和大小。

缺点:

  1. 需要一定的开发量。
  2. 合并后,hive元数据需要主动去刷新处理(直接操作hdfs文件,无法同步到hive元数据)这点很重要

实现

流程图

流程图

  1. 主线程main从元数据库MetaStore获取需要合并处理(文件数大于1)的分区信息。
  2. 根据分工不同,使用两个线程池完成异步处理。
  3. mergeHdfsThreadPool管理合并orc格式的hdfs线程。
  4. flushMetastoreThreadPool管理统计合并分区的元数据信息线程,回馈到元数据库MetaStore中。

核心代码实现

ORC合并

这里,参照官网Using Core Java。方式,实现简单的文件合并处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 合并orc文件
* @param fileDir 需要合并的分区目录
* @throws Exception
*/
public static void orcFileRollUp(String fileDir) throws Exception {
if (StringUtils.isBlank(fileDir)) {
throw new Exception("fileDir is null");
}

fileDir = fileDir.replace(HDFS_HOST,"");

Path srcPath = new Path(fileDir);
if (!fs.exists(srcPath)) {
throw new Exception("fileDir is not exists");
}
if (!fs.isDirectory(srcPath)) {
throw new Exception("fileDir is not directory");
}

FileStatus[] files = fs.listStatus(srcPath);
try {
TypeDescription schema = getSchema(files);
if (schema != null) {
//删除merge.working临时目录
String outFile = fileDir + File.separator + MARGE_FILE_NAME;
Path outMergeFilePath=null;
try {
outMergeFilePath = new Path(outFile);
if (fs.exists(outMergeFilePath)) {
fs.delete(outMergeFilePath, false);
}
} catch (FileNotFoundException e) {

}

Writer writer = OrcFile.createWriter(outMergeFilePath, OrcFile.writerOptions(fs.getConf()).setSchema(schema));
List<Path> delSrcPathList = new ArrayList<>();
for (FileStatus file : files) {
String filePath = file.getPath().toString();
Path srcPathTmp = new Path(filePath);
Reader reader = OrcFile.createReader(srcPathTmp, OrcFile.readerOptions(fs.getConf()));
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
RecordReader rows = reader.rows();
while (rows.nextBatch(batch)) {
if (batch != null) {
writer.addRowBatch(batch);
}
}
rows.close();
delSrcPathList.add(srcPathTmp);
}
writer.close();

//处理合并文件
outFile = fs.getFileStatus(outMergeFilePath).getPath().getName();
if (outFile.endsWith(".working")) {
int lastIndexOf = outFile.lastIndexOf(".working");
outFile = outFile.substring(0, lastIndexOf);
}
Path parent = outMergeFilePath.getParent();
Path newPath = null;
//移除上一次merge文件
try {
newPath = new Path(parent, outFile);
Path oldMergeFile = new Path(fileDir + File.separator + outFile);
if (fs.exists(oldMergeFile)) {
fs.delete(oldMergeFile,false);
}
fs.rename(outMergeFilePath, newPath);
} catch (FileNotFoundException e) {

}

//删除srcPath
for (Path path : delSrcPathList) {
if (path.getName().endsWith("merge")) {
continue;
}
fs.delete(path, false);
}
LOGGER.info("合并分区{}成功,合并文件={}", fileDir,newPath);
}
} catch (Exception e) {
LOGGER.error("合并分区{}失败={}", fileDir,ExceptionUtils.getFullStackTrace(e));
throw new Exception(ExceptionUtils.getFullStackTrace(e));
}
}

文件合并会先写到.merge.working文件中,合并完成后,再将.merge.working重命名为正式文件.merge结尾。最后,将之前的小文件删除。

重新统计分区元数据

采用hive原生的统计方式。StatsDev
StatsDev

其他注意点
  1. flushMetastoreThreadPool要在mergeHdfsThreadPool内的线程结束后执行。如何知道一个线程池内所有线程执行完毕?
    线程池的isTerminated,当所有线程都关闭时,会返回true
1
2
3
4
5
6
while(true){
if(mergeHdfsThreadPool.isTerminated()){
//转交flushMetastoreThreadPool执行
break;
}
}

结束

寥寥数笔,欢迎交流。

Alan Zhang wechat
欢迎您扫一扫上面的微信公众号“补愚者说”,订阅我的博客!