Spark comes with a plethora of settings. Good luck trying to make sense of them and setting their values. There is ton of documentation but not one good page describing in step by step form how to choose the values of important parameters like:
- number of executors
- number of cores per executor
- executor memory
- executor memory overhead
- spark dynamicAllocation
Spark processes a job in stages. Your goal as a developer is to minimize the # of stages since data needs to be shuffled between stages and that is an expensive operation; also the stages cannot be executed in parallel – one stage has to complete before the next one can be executed. Within a stage, spark processes data in tasks. Thus N tasks need to be completed to complete a stage. Tasks can be executed in parallel. A task is processed by a thread. And a task processes a partition of the data. Executor is a JVM instance (i.e., its a process) that lives for the lifetime of the spark job and executes tasks. Just as multiple threads live in a process and share the process memory, so multiple tasks can live in a executor and share the executor memory. Task has a 1:1 relationship with a thread (a task is executed in a thread) and thread has 1:1 relationship with CPU core (a thread consumes one CPU core when running)
In general, it is recommend to use 2-3 tasks per CPU core in the cluster [link]. we can start with the total number of virtual cores available in the cluster. Lets say that number is 500 and we want to use 80% of the cluster’s capacity. This means we want to use total 0.8*500 = 400 VCores. That settles one number. So total # of tasks = pick a number between 2 and 3 * 400 = 2.5 * 400 = 1000. That settles another number. Since a task has a 1:1 relationship with a partition, that means spark.default.parallelism and spark.sql.shuffle.partitions should be set to 1000.
Sandy Ryza notes that
I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.
Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.
So lets set number of cores per executor = 5 based on above notes (a constant which does not need to be changed based on cluster properties etc.) Any number between 3-5 should be fine in general.
as we have calculated total # of cores and # of cores per executor, we can now calculate the # of executors as 400 / 5 = 80.
the executor memory can be set equal to the max. executor memory provided by the administrator minus the overhead. the default non-heap overhead in spark is 384MB. One might want to bump this to 1GB if one encounters the “container killed by YARN for exceeding memory limits” error.
If you are using Spark SQL, you might sometimes have to forcefully repartition the dataframe into the # of partitions calculated above. Be careful because in our case spark would not repartition the data even when we asked it to by making a call to repartition. To force it to repartition the data, we followed the call to repartition with persist. When spark has to shuffle data between stages it will use the # of partitions set by spark.default.parallelism or spark.sql.shuffle.partitions, but during the first stage it does not automatically use the # of partitions set by spark.default.parallelism or spark.sql.shuffle.partitions. And if the # of partitions is only 20 whereas 400 VCores were allocated in spark-submit, only 20 tasks will be active at a time and 380 VCores will be idle. Don’t let this happen! The executors tab in the spark UI shows how many tasks are active at a given point of time. make sure the # of tasks here are equal to the # of vcores – otherwise there are CPUs sittle idle.
If data is not being cached or persisted, then the storage memory fraction can be set to 0. in our experience we found spark dynamic allocation is more of a hassle. our jobs would sometimes get stuck when using dynamic allocation. moreover, the time to process becomes more unpredictable. static allocation translates to a predictable time to process.
So here we are again:
- number of cores per executor = 4 (average of 3 and 5)
- number of executors = # of cores in cluster * load factor / number of cores per executor
- executor memory = can set it to max. available – the overhead
- executor memory overhead = default is 384MB. bump it to 1GB if getting “container killed by YARN for exceeding memory limits”
- spark.default.parallelism = 2.5 * # of cores in cluster * load factor
- spark.sql.shuffle.partitions = 2.5 * # of cores in cluster * load factor
- spark.memory.storageFraction = 0 if not doing any caching or persisting
- spark dynamicAllocation = false (gave us more trouble than the promised benefit)
Do use kryo serialization when running spark jobs