EMR Apache Spark参数调优
调优参考
https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
https://www.alibabacloud.com/help/zh/e-mapreduce/latest/configure-spark-submit-parameters
配置建议
- **内存:**如果将内存设置的很大,要注意GC所产生的消耗。通常推荐每一个Executor的内存<=64 GB。
- HDFS 吞吐量: 如果是进行HDFS读写的作业,建议每个Executor中使用<=5个并发来读写。
- **OSS读写:**如果是进行OSS读写的作业,建议是将Executor分布在不同的ECS上,这样可以将每一个ECS的带宽都用上。例如,有10台ECS,那么就可以配置num-executors=10,并设置合理的内存和并发。
- **非线程安全:**如果作业中使用了非线程安全的代码,则在设置executor-cores的时候需要注意多并发是否会造成作业的不正常。如果会造成作业不正常,推荐设置executor-cores=1。
- **Hadoop/Yarn/OS 守护进程:**当我们使用 Yarn 之类的集群管理器运行 spark 应用程序时,会有几个守护进程在后台运行,例如 NameNode、Secondary NameNode、DataNode、JobTracker 和 TaskTracker。因此,在指定 num-executors 时,我们需要确保留出足够的核心(每个节点约 1 个核心)以使这些守护进程顺利运行。
- **Spark在分配内存:**会在用户设定的内存值上溢出375 MB或7%(取大值)。
- **Yarn分配Container内存:**遵循向上取整的原则,这里也就是需要满足1 GB的整数倍
- **executor-cores数:**通常也都会被设置成和集群的可使用核一致,因为如果设置的太多,CPU会频繁切换,性能并不会提高
资源参数调优
了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。
num-executors
- 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
- 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
executor-memory
- 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
- 参数调优建议:每个Executor进程的内存设置4G8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/31/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。
executor-cores
- 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
- 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。
driver-memory
- 参数说明:该参数用于设置Driver进程的内存。
- 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。
spark.default.parallelism
- 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
- 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
spark.storage.memoryFraction
- 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
- 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
spark.shuffle.memoryFraction
- 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
- 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。
设置Spark-Submit的参数
配置计算
- 基于上述建议,让我们为每个执行程序分配 5 个核心 =>
--executor-cores = 5
(以获得良好的 HDFS 吞吐量) - 每个节点为 Hadoop/Yarn 守护进程保留 1 个核心 => 每个节点可用的核心数 = 16-1 = 15
- 因此,集群中可用的内核总数 = 15 x 30 = 450
Number of available executors = (total cores/num-cores-per-executor) = 450/5 = 90
- 每个节点的执行者数量 = 90/30 = 3
- 每个执行程序的分配内存 64GB * 1/2 = 32GB / 3 = 10GB,计算堆开销 = 10GB 的 7% = 2GB。所以,实际
--executor-memory
= 10 - 2 = 8GB
集群配置
- Worker节点 x 30 台
- 16核64 GiB 1000 GB高效云盘
- 30台
提交作业
默认配置(按照集群队列资源一半计算)
--master yarn \
--deploy-mode client \
--driver-memory 1g \
--num-executors 45 \
--executor-memory 4g \
--executor-cores 5 \
--conf spark.default.parallelism=500 \
--conf spark.sql.shuffle.partitions=500 \
运行大任务(比如号码、打点数据)
--master yarn \
--deploy-mode client \
--num-executors 90 \
--executor-memory 8g \
--executor-cores 5 \
--conf spark.default.parallelism=900 \
--conf spark.sql.shuffle.partitions=900 \
其他参数
如果Spark作业中的RDD持久化操作较少,shuffle操作较多时
--conf spark.shuffle.memoryFraction=0.4 \
--conf spark.storage.memoryFraction=0.4 \
如果Spark作业中有较多的RDD持久化操作
--conf spark.shuffle.memoryFraction=0.2 \
--conf spark.storage.memoryFraction=0.7 \
如果Spark作业中包含spark sql,且shuffle操作较多时
--conf spark.sql.shuffle.partitions=500
如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理
--driver-memory 4g