前言
这周做了个事情趁热沉淀一下。问题很明确治理小文件
。问题由来,要追溯到去年,集群治理
了。之前做到存储
和计算
的管理,后续做了简单hdfs画像
(其中,就有小文件趋势监控)。最近,集群中namenode压力有所显现。于是,针对小文件多的目录进行了排查和治理。进而,有了今天的这个主题ORC小文件合并趣谈。
核心问题
这里,首先治理的是实时导入数据的目录。这里增量数据采用SparkSQL
以动态分区增量写入的方式。众所周知,spark在处理时,每个task
都会写入一个文件(如果task处理的数据,包含n个分区的数据,就会产生n个文件)。进而,在并行度高的情况下,导致对应增量分区文件很多(存储并不大)。
在存储治理
中,平台统一要求将hive表的格式向orc
格式靠拢。orc的表在存储和查询上都有很好的提升。所以,这个问题就间接的转化为解决orc小文件问题。
解决问题
解决问题,就先从根源入手,即sparksql小文件产生源头。在spark 2.4 版本中提供了hit的方式(https://issues.apache.org/jira/browse/SPARK-24940)。
处理上,采用程序升级和定时合并的方式。本文,主要介绍如何定时合并orc文件。
措施
方案对比
经过分析,总结了两种方式。
- 使用ORC原生DDL方式合并小文件功能。
ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE;
附:hive-ddl
优点:
- 原生支持,开发量小。
- 避免了数据的解压、解码过程。
缺点:
- 不够优雅,无法指定最终合并的文件数,需要多次执行。
- 产生一个hivesql处理,中间过程分区目录会产生
.staging-hive*
的文件。 - 比较耗时。
- 重新造轮子,实现文件合并功能。
优点:
- 省时间,直接操作hdfs,省去了hive处理过程。
- 可以控制最终文件数和大小。
缺点:
- 需要一定的开发量。
- 合并后,hive元数据需要主动去刷新处理(直接操作hdfs文件,无法同步到hive元数据)这点很重要。
实现
流程图
- 主线程
main
从元数据库MetaStore
获取需要合并处理(文件数大于1)的分区信息。 - 根据分工不同,使用两个线程池完成异步处理。
mergeHdfsThreadPool
管理合并orc格式的hdfs线程。flushMetastoreThreadPool
管理统计合并分区的元数据信息线程,回馈到元数据库MetaStore
中。
核心代码实现
ORC合并
这里,参照官网Using Core Java。方式,实现简单的文件合并处理。
1 | /** |
文件合并会先写到.merge.working
文件中,合并完成后,再将.merge.working
重命名为正式文件.merge
结尾。最后,将之前的小文件删除。
重新统计分区元数据
采用hive原生的统计方式。StatsDev。
其他注意点
flushMetastoreThreadPool
要在mergeHdfsThreadPool
内的线程结束后执行。如何知道一个线程池内所有线程执行完毕?
线程池的isTerminated
,当所有线程都关闭时,会返回true
。
1 | while(true){ |
结束
寥寥数笔,欢迎交流。