Caching(Persist) in Apache Spark

Apache Spark 2.4 with Python 3

In this blog, I am going to share cache/persist in spark:

  • why should we use cache?
  • when to use?
  • the consequence of using cache.
  1. # clear cache in your cluster
  2. spark.catalog.clearCache()

Why cache() DataFrames/ RDDs ?

Every time we run some operations in a dataframe, it goes all the way back to the original data store.
This requires pulling all the data across the network for every execution.
In many/most cases, this network IO is the most expensive part of a job.


the link of the post below:

You should definitely cache() RDD’s and DataFrames in the following cases:

  • Reusing them in an iterative loop (ie. ML algos)
  • Reuse the RDD multiple times in a single application, job, or notebook.
  • When the upfront cost to regenerate the RDD partitions is costly (ie. HDFS, after a complex set of map(), filter(), etc.) This helps in the recovery process if a Worker node dies.
    Keep in mind that Spark will automatically evict RDD partitions from Workers in an LRU manner. The LRU eviction happens independently on each Worker and depends on the available memory in the Worker.

During the lifecycle of an RDD, RDD partitions may exist in memory or on disk across the cluster depending on available memory.

The Storage tab on the Spark UI shows where partitions exist (memory or disk) across the cluster at any given point in time.

Note that cache() is an alias for persist(StorageLevel.MEMORY_ONLY) which may not be ideal for datasets larger than available cluster memory. Each RDD partition that is evicted out of memory will need to be rebuilt from source (ie. HDFS, Network, etc) which is expensive.

A better solution would be to use persist(StorageLevel.MEMORY_AND_DISK_ONLY)which will spill the RDD partitions to the Worker’s local disk if they’re evicted from memory. In this case, rebuilding a partition only requires pulling data from the Worker’s local disk which is relatively fast.

How to cache your spark DataFrame?

  1. pageviewsDF.cache()

Notice:

  • The cache(..) operation doesn’t do anything other than mark a DataFrame as cacheable.
  • And while it does return an instance of DataFrame it is not technically a transformation or action
  • In order to actually cache the data, Spark has to process over every single record.
  • As Spark processes every record, the cache will be materialized.
  • A very common method for materializing the cache is to execute a count().
  1. pageviewsDF.cache().count()
  • The last count() will take a little longer than normal.It has to perform the cache and do the work of materializing the cache.
  • Now the pageviewsDF is cached AND the cache has been materialized.
  • BUT BEFORE YOU DO Check the Spark UI to make sure it’s still empty even after calling cache().

persist()

  • cache() is just an alias for persist()
  • Let’s take a look at the API docs for
  1. from pyspark import StorageLevel
  2. Dataset.persist(..) #if using Scala
  3. DataFrame.persist(..) #if using Python

persist() allows one to specify an additional parameter (storage level) indicating how the data is cached:

  • DISK_ONLY
    DISK_ONLY_2
    MEMORY_AND_DISK
    MEMORY_AND_DISK_2
    MEMORY_AND_DISK_SER
    MEMORY_AND_DISK_SER_2
    MEMORY_ONLY
    MEMORY_ONLY_2
    MEMORY_ONLY_SER
    MEMORY_ONLY_SER_2
    OFF_HEAP
    Note: The default storage level for…

  • RDDs are MEMORY_ONLY.
    DataFrames are MEMORY_AND_DISK.
    Streaming is MEMORY_AND_DISK_2.

Remember to unpersist the dataframe.
pageviewsDF.unpersist()

Illustration

Below illustration is run in my own cluster.

Before cache()

Check the cluster, SparkUI, Storage
Caching(Persist) in Apache Spark - 图1
Nothing there.

  • Let’s count() orderBy() a Dataframe and record the runtime.
    Caching(Persist) in Apache Spark - 图2

Let’s cache() this dataframe, and orderBy().count() again.

Caching(Persist) in Apache Spark - 图3

  • Check the SparkUI, Storage:
    Caching(Persist) in Apache Spark - 图4

100% cached in RAM.

Conclusion: when using a large data frame multiple times, please cache it, let’s trade a space for time, save your time for GYM :)