In this post, I will share a few quick tips about scaling your Spark applications to larger datasets without having large executor memory.
- Increase Shuffle partitions: The default shuffle partitions is 200, for larger datasets, you are better off with larger number of shuffle partitions. This helps in many ways firstly, it avoids OOM errors on executors because it reduces the size of each shuffle partition. Secondly, it speeds up operations such as hashpartitionbecause there are more buckets. The config option for spark submit is
--conf spark.sql.shuffle.partitions=
- Specify the key for partitionby: By default the rows are partitioned via hashpartition by computing hash of the entire row, but with your queries, you know better. For wide datasets, choosing a key identifier column can significantly increase the speed of hashing.
df.repartition(num_partitions, partitonByCols:_*)
. Here by default, if you don't give the partition by columns, the dataframe just uses hash partitioning on the entire row. - Increase Overhead memory: Overhead memory is used by spark for storing interned strings and other storage, you can increase this to slightly larger value, if you see that the tasks are failing with explicit errors notifying you to increase this value. A value of around 12 percent of executor memory, usually suffices.
--conf spark.executor.memoryOverhead=
. - Avoid caching of datasets, unless necessary: The rule is simple, if you cache a dataset, it will take up memory and storage space (from spark 2.0, cache is an alias to persist) and that means less memory for other operations such as shuffle.
- Have smaller and more executors, rather than larger and few executors: The importance of this cannot be overstated, you should always keep executor cores and memory smaller and have more executors. A good thumb rule, that I often use is 25G executor memory and 5 cores per executor, In addition 2700mb of overhead memory per executor.
- While saving the data, repartition it by a key that will be used for accessing it later.
- Use efficient file storage formats: The format that gives the best performance is ORC.
- If using ORC, use native orc with spark 2.3.0+.
--conf spark.sql.orc.impl=native
- Increase ReservedCodeCacheSize: As you may know, spark uses wholestagecodegen to generate code for dataframe operations. for wider transformations, the size of code generated is large, so when hotspot compiler tries to compile it into native code, it may cause warnings and or errors. You would certainly benefit from having larger ReservedCodeCacheSize. Pass this in extraJavaOptions to executor and driver
-XX:ReservedCodeCacheSize=
. 250 mb should be fine for most of the generated code. - GC tuning: You can tune Garbage collection to avoid full GC and reduce GC time. Use the low pause G1GC, more Concurrent GC threads and parallel GC threads, lower initiating heap occupancy percent. pass these in extraJavaOptions.
-XX:+UseG1GC -XX:ConcGCThreads:
. Note that concurrent gc threads should be less than parallel gc threads.-XX:ParallelGCThreads: -XX:InitiatingHeapOccupancyPercent=30