背景
最近在做公司集群优化工作,现状是公司的离线集群跑着一些实时任务(flink和sparkstreaming)。有时候会因为晚上离线任务起来后,集群资源不够导致实时任务延迟等。为了解决这个问题,部署了一套实时集群,专门用来跑实时任务。从而将实时任务和离线任务进行隔离。
问题
目前,有些spark streaming任务需要读写hdfs或着操作hive表。那么如果要迁移这些任务,就需要满足实时集群可以访问离线集群的存储资源。鉴于这种场景,进行了相关调研。
目标
将离线集群中所有的实时任务迁移至实时集群进行管理。实现计算独立+存储共享,进而实现集群优化的目的。
业界方案
Federation+Viewfs
hadoop 原生的解决方案。
优点:
- 原生自带支持,无需二次开发。
- 系统兼容性好。
缺点:
- 需要客户端改动很多配置,如果新增节点,处理比较繁琐,扩展不方便。
- 需要修改客户端代码,由以前hdfs://xxx/,改为viewfs://xxx/。
Alluxio
Alluxio可以为那些大数据应用提供一个数量级的加速,同时它还提供了通用的数据访问接口。对于底层存储系统,Alluxio连接了大数据应用和传统存储系统之间的间隔,并且重新定义了一组面向数据使用的工作负载程序。
因Alluxio对应用屏蔽了底层存储系统的整合细节,所以任何底层存储系统都可以支撑运行在Alluxio之上的应用和框架。此外Alluxio可以挂载多种底层存储系统,所以它可以作为统一层为任意数量的不同数据源提供服务。
优点:
- 易于扩展,支持多种存储。
- 可以优化spark的任务性能,加速数据处理。
- 代码开源:https://github.com/Alluxio/alluxio。
缺点:
- 需要客户端依赖Alluxio,使用Alluxio的api访问hdfs数据。
- 对客户端不透明,需要调整数据接口即协议。
- 集群需要额外部署Alluxio,增加运维成本。
问题
alluxio如何感知底层文件系统变更?
alluxio如何保证数据的实时性?
如何保证数据一致性?
二次开发方案
实现一个namenode代理,将客户端请求由逻辑路径路由到物理路径。
目前,了解到今日头条有实现类似的技术。
开源版本:https://github.com/bytedance/nnproxy
实际实施方案
hdfs federation
借鉴hdfs federation的namenode横向扩展能力,以及资源隔离的特性。
hdfs-site.xml配置
hdfs-site.xml中增加多个nameservice(一个离线,一个实时)。1
2
3
4
5
6
7
8
9
10
11<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.nameservices</name>
<value>nameserviceoffline,nameserviceonline</value>
</property>
(其他配置省略,直接从两个集群的hdfs-site.xml拷贝就好)
......
</configuration>
测试用例
用例 | 用例 |
---|---|
跨集群读hdfs | pass |
跨集群写hdfs | pass |
跨集群读hive表 | pass |
跨集群写hive表 | pass |
读写hdfs
测试代码
java
1 | public class JavaSparkHdfsExample { |
spark-shell
在实时集群提交1
2
3
4
5
6
7
8
9
10
11
12
13
14
15$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 512M \
--executor-cores 1 \
--conf spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/jars/htrace-core-3.2.0-incubating.jar \
--conf spark.executorEnv.JAVA_HOME=/usr/zhq/java \
--conf spark.yarn.appMasterEnv.JAVA_HOME=/usr/zhq/java/ \
--conf spark.yarn.dist.files=/home/zhq/conf/hdfs-site.xml
--driver-memory 512M \
--queue root.sale_prod_etl.sla \
--verbose \
--name "sparkdemo" \
--class JavaSparkHdfsExample \
SparkDemo-1.0-SNAPSHOT.jar;
关键针对任务,提交参数中spark.yarn.dist.files
制定配置了federation的hdfs-site.xml。
读写hive表
测试代码
java
1 | public class JavaSparkHiveExample { |
spark-shell
1 | $SPARK_HOME/bin/spark-submit \ |
这里,同样通过spark.yarn.dist.files
重写spark运行参数。
hive-site.xml
直接从离线集群拷贝。hdfs-site.xml
是上面配置的hdfs federation。core-site.xml
直接从离线集群拷贝,这里为了覆盖默认的实时集群的core-site.xml(因为spark hive 在写表的时候,默认是加载集群的core-site.xml中fs.defaultFS
找hdfs目录的。当然,也可以直接程序中sparkcontent设置fs.defaultFS
)。
总结
遇到问题还是要仔细分析问题本质,然后付诸实践。原理的东西,有时间还是要仔细推敲一下。比如:spark hive 中的HiveClientImpl
实现。就会知道大概的一个sql执行过程,延伸到有关sql 解析和执行计划,如何分解成MR任务,然后落表的。很多细节需要理解,路途漫漫。
Alluxio方案也验证过,方案可行,需要注意的问题比较多,出现或多或少的问题主要还是不熟悉,有一定学习成本。
比如:
报错
1 | Peer indicated failure: Plain authentication failed: User yarn is not configured for any impersonation. impersonationUser: hdfs |
解决:需修改alluxio配置文件alluxio-site.properties,添加如下内容以实现yarn对其他用户的代理权限alluxio.master.security.impersonation.yarn.users=*
后面,会研究下Alluxio
,这个“加速器”,前景貌似还不错,也欢迎一起交流讨论。