Out Of Memory in Spark(OOM) - Typical causes and resolutions

  

Spark OOM – Can occur at Driver/Executor/node manager.

The error that we see is: java.lang.OutOfMermoryError.

This will happen when the required memory exceeds the available memory for a particular operation.

1.       Driver OOM - possible causes:

a.       Collect

For e.g. in the program, we have

   val data = inDF.collect()

                Collect operation, collects the data from all executors and send it to the driver. The driver will try to merge into the single object, then there is a possibility that it may be too big to fit into the driver memory.

This problem can be solved in two ways:

Ø  Setting proper limit using spark.driver.maxResultSize

Ø  Repartitioning before saving the result to the output file.

§  data.repartition(1).write…

§  Repartition uses dedicated executor to collect.

b.       Broadcast Join

¨       The table will be materialized at the driver and then broadcasted to the exectuors.

c.       Low driver memory configuration as per the Application

d.       Misconfiguration of spark.sql.autoBroadcastJoinThreshold

General remedy to avoid OOM at Driver is write our application in such way to avoid the collection at the driver.

Note: If we are getting OOM at driver in Spark SQL, it will be due to the Broadcast Join, the remedy for this is: either increase the driver memory or reduce the spark.sql.autoBroadcastJoinThreshold, so join operations will use the memory friendly Sort Merge Join.

2.       Executor OOM – possible causes:

                                 i.            High concurrency

                               ii.            Big partition in one of the nodes

                             iii.            YARN memory overhead

                             iv.            Incorrect configuration

                               v.            Inefficient queries

                             vi.            GC Overhead limit exceeds

                            vii.            Fetch failed

Total executor memory = total RAM per node/number of executors per node.

         i.            High concurrency:

                Disproportionate number of cores in executors may lead to process too many partitions in parallel, which will have their memory overhead.

       ii.            Big partition in one of the nodes:

                This can happen in the incremental write/append operation of a file after changing the block(storage) size configuration, e.g. when the file created the block size was 128MB, now changed to 256 MB, the existing blocks will have 128MB, but new blocks will have 256MB. If we read that file for any operation, because of different partition sizes, the executors which processes the big partitions may throw OOM.

To resolve this, we should repartition in the Spark application.

The other possible use case is big partition may have resulted due to file decompression or metadata overhead of a file format (Exploding of a parquet file).

                 This can be handled by fine tuning the spark.sql.shuffle.partitions.          

     iii.            YARN memory overhead:

                YARN memory (off-heap memory) is used to store spark internal objects or language specific objects, thread stacks, NIO buffers. 0.07 of the executor memory is used for YARN overhead memory.

 If the application is running on YARN, the application may fail due to YARN memory overhead.

    Configuring the memory using spark.yarn.executor.memoryOverhead in the application should resolve it.

     iv.            Incorrect configuration:

                Each application will have different memory requirement.

Need to make sure that the output will not spill to the disk.

       v.            Inefficient queries:

                                Selecting all columns from a parquet file. Each column needs some in-memory column batch state. The overhead will directly increase with the number of columns selected.

The catalyst optimizer tries to optimize the queries as much as, but it can’t help in scenarios where the queries are written inefficiently.

                To reduce the load of executors, filter as much data as possible, using partition pruning (partition columns) if possible, which will reduce movement of data.

                If the partitions are big, set the optimal value to spark.sql.shuffle.partitions.

                This issue can be resolved by setting optimal values for spark.default.parallelism and spark.executor.cores based on the application.

     vi.            GC Overhead limit exceeds:

                Resolution is to increase the executor memory.

Also check for the spark.storage.memoryFraction value, which shouldn’t exceed more than 0.6 (>0.6).

   vii.            Fetch Failed:

                Increase the memory or shuffle partitions.

3.       Node manager OOM

a.       Node manager runs external shuffle service.

b.       Heavy shuffle may lead to OOM.

Increasing the direct buffer memory will solve the issue.

4.       On Driver/Executor due to Un even or non-uniform distribution of the data

a.       In a cogroup operation, all the data of a cogroup will be loaded into memory, in this case if the data is skew or too big to fit into the memory, which could run into OOM error.

 

This can be addressed with the data skew solutions.

 

Comments

Popular posts from this blog

map vs flatMap in Spark

Spark Persistence(Caching)