一些心酸经历

背景

近期在做spark 运行任务信息采集(便于后续的任务执行分析作准备),遇到一点问题:

  • 通过spark history server(下面统称:SHS)提供的restful api获取application信息,与通过yarn采集的application数据量不一致。
  • 采集几天,偶尔出现SHS restful api无数据返回问题。
1
2
3
4
运行环境
hadoop:hadoop-2.6.0-cdh5.13.0
spark:spark-2.1.1-bin-cdh5.13
jdk:jdk1.8.0_74

处理

数据不一致问题

针对第一个数据不一致问题,主要是采集的维度不同导致。

  • yarn请求api:/ws/v1/cluster/apps?finishedTimeBegin={0}&finishedTimeEnd={1}
  • SHS请求api:/api/v1/applications?minDate=%s&maxDate=%s&status=completed
    yarn以application结束时间采集,shs以application开始时间采集(minDate和maxDate都是和startTime对比)。

spark-2.1.1源码:
spark-2.1.1

spark-3.0源码:
spark-3.0

对比返现,v3.0多了针对application结束时间采集进行获取。

解决

由于平台不会升级spark。所以,在采用restful api采集的时候,minDate 偏移量向前推1-2天。然后再和上一次抓去的applicationId进行去重处理,保证误差最小化。

SHS无数据返回问题

现象

某天返现sparkUI页面无响应,类似下图:

SHS

查看后台采集SHS,也是直接返回了一个html页面内容。推测应该是SHS的jetty无法转发相应的request到指定handler,给了个默认的页面。

从服务器上dump了jvm堆情况:

heap

可以看到此时SHS内存中大部分是org.eclipse.jetty.server.handler.ContextHandlerCollection对象,可能理解。因为SHS内置jetty就是通过ContextHandlerCollection集合存放各种request请求。然后依据不同的uri转发到不同的handler上进行处理。在看下内存占用,大概9.5GB。

第二大的对象是org.apache.spark.deploy.history.HistoryServer,占用内存大概3.3GB。这个主要是实现对eventLog进行解析,封装成sparkUI对象,然后再缓存到cache中。

这里的核心对象sparkUI就是我们再spark web ui上看到的一个spark application的信息。

1
2
3
4
5
6
val environmentListener: EnvironmentListener,
val storageStatusListener: StorageStatusListener,
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
val operationGraphListener: RDDOperationGraphListener

以上,每一个Listener都是一个监听器,来处理更新application对应的environment、storage、executor、job、stage、task信息。

第三个是JobProgressListener,大概2.1GB。一个spark application对应多个job,一个job对应多个stage,一个stage对应多个task。所以,这个listenter是主要对象。在使用restful api抓去job,stage、task的时候,都依赖这个。

那么,看完了内存分布,在回答问题。之所以api无响应,可能是jetty无法找到对应的handler,无法请求到某个application对应的sparkUI。

解决

我们回到源码开看下,在包:org.apache.spark.status.api.v1 有ApiRootResource类。这个就是api的controller,可以看到很多熟悉的url。

api

截取了部分,看到每个请求都是new了一个封装类,然后通过withSparkUI获取一个sparkUI。这个ui就是从historyserver 缓存cache中拿。

再看看采集SHS的实现
taskv1

比如上面,application、job、stage、task 分成4个异步线程处理。通过上面的分析可知,每次api请求都需要依赖sparkUI对象,而这个对象是从SHS缓存中获取的。如果缓存失效会刷新缓存。
因此,4个异步线程会导致频繁的刷新缓存,导致SHS负载增加。

介于这个原因,优化了采集实现逻辑
taskv2

在采集指定区间application后,通过一个批量采集task完成对job、stage、task信息的抓去。这样,就较少了缓存cache的失效次数。减轻了SHS对eventLog的解析再cache过程。

优化后的效果比较明显,比异步处理快很多。之前4个任务采集一天内的数据要跑2-3小时,改造后也就1小时左右。而且也没出现过SHS无响应的问题。


其实,回过头来想想。在没有认清SHS 实现原理的基础上,本来想异步加快采集处理,结果却适得其反。这些问题其实在设计初就应该刨根问底。

另外,在此期间也对LinkedIn的dr-elephant也做了了解。同样有作业采集,在针对Spark的抓取上,dr-elephant有2套实现,一个是FSFetcher(直接解析eventLog,省去了SHS解析过程);另一个SparkRestClient(采用restful api + log)的方式,先拿到一个区间内的application,然后在看里application信息,难道对应的eventlog。

对比之下,就是将“batchFetcherTask”变成了访问hdfs获取日志,解析日志的过程。进而减轻了SHS的压力。

有兴趣的可以看下源码,比较简洁:spark-fetchers

Alan Zhang wechat
欢迎您扫一扫上面的微信公众号“补愚者说”,订阅我的博客!