Out Of Memory in Spark(OOM) - Typical causes and resolutions
Spark OOM – Can occur at Driver/Executor/node manager.
The error that we see is: java.lang.OutOfMermoryError.
This will happen when the required memory exceeds the available
memory for a particular operation.
1.
Driver OOM - possible causes:
a.
Collect
For e.g. in the program, we have
val data = inDF.collect()
Collect
operation, collects the data from all executors and send it to the driver. The
driver will try to merge into the single object, then there is a possibility
that it may be too big to fit into the driver memory.
This problem can be solved in two
ways:
Ø
Setting proper limit using spark.driver.maxResultSize
Ø
Repartitioning before saving the result to the
output file.
§
data.repartition(1).write…
§
Repartition uses dedicated executor to collect.
b.
Broadcast Join
¨
The table will be materialized at the driver and
then broadcasted to the exectuors.
c.
Low driver memory configuration as per the
Application
d.
Misconfiguration of spark.sql.autoBroadcastJoinThreshold
General remedy to avoid OOM at Driver
is write our application in such way to avoid the collection at the
driver.
Note: If we are getting OOM at
driver in Spark SQL, it will be due to the Broadcast Join, the remedy for this
is: either increase the driver memory or reduce the spark.sql.autoBroadcastJoinThreshold,
so join operations will use the memory friendly Sort Merge Join.
2.
Executor OOM – possible causes:
i.
High concurrency
ii.
Big partition in one of the nodes
iii.
YARN memory overhead
iv.
Incorrect configuration
v.
Inefficient queries
vi.
GC Overhead limit exceeds
vii.
Fetch failed
Total executor memory = total RAM
per node/number of executors per node.
i.
High concurrency:
Disproportionate
number of cores in executors may lead to process too many partitions in
parallel, which will have their memory overhead.
ii.
Big partition in one of the nodes:
This can
happen in the incremental write/append operation of a file after changing the block(storage)
size configuration, e.g. when the file created the block size was 128MB, now changed
to 256 MB, the existing blocks will have 128MB, but new blocks will have 256MB.
If we read that file for any operation, because of different partition sizes, the
executors which processes the big partitions may throw OOM.
To resolve this, we should repartition
in the Spark application.
The other possible use case is
big partition may have resulted due to file decompression or metadata overhead
of a file format (Exploding of a parquet file).
This can be handled by fine tuning the
spark.sql.shuffle.partitions.
iii.
YARN memory overhead:
YARN
memory (off-heap memory) is used to store spark internal objects or language
specific objects, thread stacks, NIO buffers. 0.07 of the executor memory is
used for YARN overhead memory.
If the application is running on YARN, the
application may fail due to YARN memory overhead.
Configuring the
memory using spark.yarn.executor.memoryOverhead in the application should resolve
it.
iv.
Incorrect configuration:
Each
application will have different memory requirement.
Need to make
sure that the output will not spill to the disk.
v.
Inefficient queries:
Selecting
all columns from a parquet file. Each column needs some in-memory column batch
state. The overhead will directly increase with the number of columns selected.
The catalyst optimizer tries to optimize the queries as much
as, but it can’t help in scenarios where the queries are written inefficiently.
To
reduce the load of executors, filter as much data as possible, using partition pruning
(partition columns) if possible, which will reduce movement of data.
If the
partitions are big, set the optimal value to spark.sql.shuffle.partitions.
This
issue can be resolved by setting optimal values for spark.default.parallelism
and spark.executor.cores based on the application.
vi.
GC Overhead limit exceeds:
Resolution
is to increase the executor memory.
Also check for the spark.storage.memoryFraction value, which
shouldn’t exceed more than 0.6 (>0.6).
vii.
Fetch Failed:
Increase
the memory or shuffle partitions.
3.
Node manager OOM
a.
Node manager runs external shuffle service.
b.
Heavy shuffle may lead to OOM.
Increasing the direct buffer
memory will solve the issue.
4.
On Driver/Executor due to Un even or non-uniform
distribution of the data
a.
In a cogroup operation, all the data of a
cogroup will be loaded into memory, in this case if the data is skew or too big
to fit into the memory, which could run into OOM error.
This
can be addressed with the data skew solutions.
Comments
Post a Comment