Caching(Persist) in Apache Spark
Apache Spark 2.4 with Python 3
In this blog, I am going to share cache/persist in spark:
- why should we use cache?
- when to use?
- the consequence of using cache.
# clear cache in your cluster
spark.catalog.clearCache()
Why cache() DataFrames/ RDDs ?
Every time we run some operations in a dataframe, it goes all the way back to the original data store.
This requires pulling all the data across the network for every execution.
In many/most cases, this network IO is the most expensive part of a job.
the link of the post below:
You should definitely cache()
RDD’s and DataFrames in the following cases:
- Reusing them in an iterative loop (ie. ML algos)
- Reuse the RDD multiple times in a single application, job, or notebook.
- When the upfront cost to regenerate the RDD partitions is costly (ie. HDFS, after a complex set of
map()
,filter()
, etc.) This helps in the recovery process if a Worker node dies.
Keep in mind that Spark will automatically evict RDD partitions from Workers in an LRU manner. The LRU eviction happens independently on each Worker and depends on the available memory in the Worker.
During the lifecycle of an RDD, RDD partitions may exist in memory or on disk across the cluster depending on available memory.
The Storage tab on the Spark UI shows where partitions exist (memory or disk) across the cluster at any given point in time.
Note that cache()
is an alias for persist(StorageLevel.MEMORY_ONLY)
which may not be ideal for datasets larger than available cluster memory. Each RDD partition that is evicted out of memory will need to be rebuilt from source (ie. HDFS, Network, etc) which is expensive.
A better solution would be to use persist(StorageLevel.MEMORY_AND_DISK_ONLY)
which will spill the RDD partitions to the Worker’s local disk if they’re evicted from memory. In this case, rebuilding a partition only requires pulling data from the Worker’s local disk which is relatively fast.
How to cache your spark DataFrame?
pageviewsDF.cache()
Notice:
- The
cache(..)
operation doesn’t do anything other than mark a DataFrame as cacheable. - And while it does return an instance of DataFrame it is not technically a transformation or action
- In order to actually cache the data, Spark has to process over every single record.
- As Spark processes every record, the cache will be materialized.
- A very common method for materializing the cache is to execute a count().
pageviewsDF.cache().count()
- The last count() will take a little longer than normal.It has to perform the cache and do the work of materializing the cache.
- Now the pageviewsDF is cached AND the cache has been materialized.
- BUT BEFORE YOU DO Check the Spark UI to make sure it’s still empty even after calling cache().
persist()
cache()
is just an alias forpersist()
- Let’s take a look at the API docs for
from pyspark import StorageLevel
Dataset.persist(..) #if using Scala
DataFrame.persist(..) #if using Python
persist()
allows one to specify an additional parameter (storage level) indicating how the data is cached:
DISK_ONLY
DISK_ONLY_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
MEMORY_ONLY
MEMORY_ONLY_2
MEMORY_ONLY_SER
MEMORY_ONLY_SER_2
OFF_HEAP
Note: The default storage level for…RDDs are MEMORY_ONLY.
DataFrames are MEMORY_AND_DISK.
Streaming is MEMORY_AND_DISK_2.
Remember to unpersist the dataframe.
pageviewsDF.unpersist()
Illustration
Below illustration is run in my own cluster.
Before cache()
Check the cluster, SparkUI, Storage
Nothing there.
- Let’s
count()
orderBy()
a Dataframe and record the runtime.
Let’s cache()
this dataframe, and orderBy().count()
again.
- Check the SparkUI, Storage:
100% cached in RAM.
Conclusion: when using a large data frame multiple times, please cache it, let’s trade a space for time, save your time for GYM :)