集群优化心酸历程

背景

最近在做公司集群优化工作,现状是公司的离线集群跑着一些实时任务(flink和sparkstreaming)。有时候会因为晚上离线任务起来后,集群资源不够导致实时任务延迟等。为了解决这个问题,部署了一套实时集群,专门用来跑实时任务。从而将实时任务和离线任务进行隔离。

问题

目前,有些spark streaming任务需要读写hdfs或着操作hive表。那么如果要迁移这些任务,就需要满足实时集群可以访问离线集群的存储资源。鉴于这种场景,进行了相关调研。

目标

将离线集群中所有的实时任务迁移至实时集群进行管理。实现计算独立+存储共享,进而实现集群优化的目的。

业界方案

Federation+Viewfs

参考:http://www.searchdoc.cn/hadoop/hadoop.apache.org/docs/r2.9.0/hadoop-project-dist/hadoop-hdfs/ViewFs.com.coder114.cn.html

hadoop 原生的解决方案。

MultipleNamenodes

优点:

  1. 原生自带支持,无需二次开发。
  2. 系统兼容性好。

缺点:

  1. 需要客户端改动很多配置,如果新增节点,处理比较繁琐,扩展不方便。
  2. 需要修改客户端代码,由以前hdfs://xxx/,改为viewfs://xxx/。

Alluxio

官网:http://www.alluxio.org/

Alluxio可以为那些大数据应用提供一个数量级的加速,同时它还提供了通用的数据访问接口。对于底层存储系统,Alluxio连接了大数据应用和传统存储系统之间的间隔,并且重新定义了一组面向数据使用的工作负载程序。

因Alluxio对应用屏蔽了底层存储系统的整合细节,所以任何底层存储系统都可以支撑运行在Alluxio之上的应用和框架。此外Alluxio可以挂载多种底层存储系统,所以它可以作为统一层为任意数量的不同数据源提供服务。
Alluxio

优点:

  1. 易于扩展,支持多种存储。
  2. 可以优化spark的任务性能,加速数据处理。
  3. 代码开源:https://github.com/Alluxio/alluxio。

缺点:

  1. 需要客户端依赖Alluxio,使用Alluxio的api访问hdfs数据。
  2. 对客户端不透明,需要调整数据接口即协议。
  3. 集群需要额外部署Alluxio,增加运维成本。

问题

  1. alluxio如何感知底层文件系统变更?

  2. alluxio如何保证数据的实时性?

  3. 如何保证数据一致性?

二次开发方案

实现一个namenode代理,将客户端请求由逻辑路径路由到物理路径。

目前,了解到今日头条有实现类似的技术。

开源版本:https://github.com/bytedance/nnproxy


实际实施方案

hdfs federation

借鉴hdfs federation的namenode横向扩展能力,以及资源隔离的特性。

hdfs federation

hdfs-site.xml配置

hdfs-site.xml中增加多个nameservice(一个离线,一个实时)。

1
2
3
4
5
6
7
8
9
10
11
<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.nameservices</name>
<value>nameserviceoffline,nameserviceonline</value>
</property>
(其他配置省略,直接从两个集群的hdfs-site.xml拷贝就好)
......
</configuration>

测试用例

用例 用例
跨集群读hdfs pass
跨集群写hdfs pass
跨集群读hive表 pass
跨集群写hive表 pass

读写hdfs

测试代码
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class JavaSparkHdfsExample {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkHdfsExample");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// JavaStreamingContext sc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaRDD<String> textFile = sc.textFile("hdfs://nameserviceoffline/tmp/zhq/kv1.txt");
JavaPairRDD<String, Integer> counts = textFile
.flatMap(s -> Arrays.asList(s.split(",")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
try {
FileSystem fs = FileSystem.get(new URI("hdfs://nameserviceoffline/tmp/zhq/kv2.txt"),sc.hadoopConfiguration());
fs.delete(new Path("hdfs://nameserviceoffline/tmp/zhq/kv2.txt"),true);
counts.saveAsTextFile("hdfs://nameserviceoffline/tmp/zhq/kv2.txt");
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
}
spark-shell

在实时集群提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 512M \
--executor-cores 1 \
--conf spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/jars/htrace-core-3.2.0-incubating.jar \
--conf spark.executorEnv.JAVA_HOME=/usr/zhq/java \
--conf spark.yarn.appMasterEnv.JAVA_HOME=/usr/zhq/java/ \
--conf spark.yarn.dist.files=/home/zhq/conf/hdfs-site.xml
--driver-memory 512M \
--queue root.sale_prod_etl.sla \
--verbose \
--name "sparkdemo" \
--class JavaSparkHdfsExample \
SparkDemo-1.0-SNAPSHOT.jar;

关键针对任务,提交参数中spark.yarn.dist.files 制定配置了federation的hdfs-site.xml。

读写hive表

测试代码
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class JavaSparkHiveExample {
// $example on:spark_hive$
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// $example off:spark_hive$
public static void main(String[] args) {
// $example on:spark_hive$
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkHiveExample");
SparkSession spark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
spark.sql("SELECT * FROM ods.src_test").show();
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM ods.src_test").show();

//insert data
spark.sql("insert into ods.src_test select 1,'val_1'");
spark.sql("SELECT COUNT(*) FROM ods.src_test").show();
spark.stop();
}
}
spark-shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 512M \
--executor-cores 1 \
--conf spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/jars/htrace-core-3.2.0-incubating.jar \
--conf spark.executorEnv.JAVA_HOME=/usr/zhq/java \
--conf spark.yarn.appMasterEnv.JAVA_HOME=/usr/zhq/java/ \
--conf spark.yarn.dist.files=/home/zhq/conf/hive-site.xml,/home/zhq/conf/hdfs-site.xml,/home/zhq/conf/core-site.xml \
--driver-memory 512M \
--queue root.sale_prod_etl.sla \
--verbose \
--name "sparkdemo" \
--class JavaSparkHiveExample \
SparkDemo-1.0-SNAPSHOT.jar;

这里,同样通过spark.yarn.dist.files 重写spark运行参数。

  • hive-site.xml直接从离线集群拷贝。
  • hdfs-site.xml是上面配置的hdfs federation。
  • core-site.xml直接从离线集群拷贝,这里为了覆盖默认的实时集群的core-site.xml(因为spark hive 在写表的时候,默认是加载集群的core-site.xml中fs.defaultFS找hdfs目录的。当然,也可以直接程序中sparkcontent设置fs.defaultFS)。

总结

遇到问题还是要仔细分析问题本质,然后付诸实践。原理的东西,有时间还是要仔细推敲一下。比如:spark hive 中的HiveClientImpl实现。就会知道大概的一个sql执行过程,延伸到有关sql 解析和执行计划,如何分解成MR任务,然后落表的。很多细节需要理解,路途漫漫。

Alluxio方案也验证过,方案可行,需要注意的问题比较多,出现或多或少的问题主要还是不熟悉,有一定学习成本。

比如:

报错

1
Peer indicated failure: Plain authentication failed: User yarn is not configured for any impersonation. impersonationUser: hdfs

解决:需修改alluxio配置文件alluxio-site.properties,添加如下内容以实现yarn对其他用户的代理权限alluxio.master.security.impersonation.yarn.users=*


后面,会研究下Alluxio,这个“加速器”,前景貌似还不错,也欢迎一起交流讨论。

Alan Zhang wechat
欢迎您扫一扫上面的微信公众号“补愚者说”,订阅我的博客!