It is very common in Big Data environment to deal with data of size Terabytes. We might observer some times that save action or append action takes very long time. Sometimes they even fail with error Saying that "No space left on device". This error is not Out Of Memory error. OOM error says that there is no space in RAM where "No space left on device" says that the disks are filled up with data and there is no space left on hard disk to accommodate more data.
If you find that a cluster using Spark 2.0.0
version takes a longer time to append data to an existing dataset and in particular, all of Spark jobs have finished, but your command has not finished, it is because driver node is moving the output files of tasks from the job temporary directory to the final destination one-by-one, which is slow with cloud storage. To resolve this issue, set mapreduce.fileoutputcommitter.algorithm.version
to 2. This issue does not affect overwriting a dataset or writing data to a new location.
What is the cause?
When Spark appends data to an existing dataset, Spark uses FileOutputCommitter
to manage staging output files and final output files. The behavior of FileOutputCommitter
has direct impact on the performance of jobs that write data.
A FileOutputCommitter
has two methods, commitTask
and commitJob
. Apache Spark 2.0 and higher versions use Apache Hadoop 2, which uses the value of mapreduce.fileoutputcommitter.algorithm.version
to control how commitTask
and commitJob
work. In Hadoop 2, the default value of mapreduce.fileoutputcommitter.algorithm.version
is 1. For this version, commitTask
moves data generated by a task from the task temporary directory to job temporary directory and when all tasks complete, commitJob
moves data to from job temporary directory to the final destination [1]. Because the driver is doing the work of commitJob
, for cloud storage, this operation can take a long time. You may often think that your cell is “hanging”. However, when the value of mapreduce.fileoutputcommitter.algorithm.version
is 2, commitTask
moves data generated by a task directly to the final destination and commitJob
is basically a no-op.
How to use this?
You can set this config by using any of the following methods:
- When you launch your cluster, you can put
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
in the Spark config. - In your notebook, you can run
%sql set mapreduce.fileoutputcommitter.algorithm.version=2
orspark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
(spark
is aSparkSession
object provided with Databricks notebooks). - When you write data using Dataset API, you can set it in the option, i.e.
dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2")
.