An overview of spark performance optimisations

Shashank Baravani
10 min readApr 7, 2019

If you are new to Spark and suddenly find yourself trying to optimise a fairly complex application then where do you start? Here is a small write up covering the whole landscape offering starting points for a more a more focussed and detailed analysis later on.

Spark is a beast and tuning it is not trivial but one can break down into focus areas — catalyst, spark apis, executers, resource management, partitioning, data formats, transformations, data serialisation, joins and caching. At the very least this should help prevent getting overwhelmed by the vastness of the subject of exploration.

The very first thing that crops up while trying to optimise spark applications is Project Tungsten of which the spark catalyst is a primary component.

Spark catalyst

Spark catalyst optimiser for Spark SQL creates an highly performant execution plan which is far removed from the queries that we write. Understanding the catalyst is helpful because it helps us in tuning of the optimiser as well as appreciate some of the choices that we will make with regards to many of the spark features. It works in following phases

Phase 1

At the outset spark first creates a logical plan which contains unresolved type references. At its core, catalyst contains a general library for representing trees and applying rules to manipulate them. The logical plan is essentially a tree that can be transformed into another tree by application of rules. Except the last phase while the actual code materialisation happens, all others are are rule based.

Phase 2:

This phase involves creation of a logical plan after resolving type references. Spark SQL uses Catalyst rules and a Catalog object that tracks the tables in all data sources to resolve these attributes.

Phase 3.

This phase involves analysis and optimisation of the logical plan by application of standard rule-based optimisations to the logical plan. These include constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression simplification and other rules.

Phase 4.
This phase involves physical planning for generating multiple plans and comparing them based on cost. In this phase, spark also performs some rule based physical optimisations such as collapsing or pipelining projections or filters into a single map operation. Also, certain operations from the logical plan are pushed to data sources that support predicate or projection pushdown

Phase 5.

In this final phase spark peforms code generation to compile parts of the query to Java bytecode. The catalyst use a Scala feature called Quasiquotes to perform programmatic construction of ASTs (abstract syntax trees) in the Scala, which can then be fed to the Scala compiler at runtime to generate bytecode. A catalyst transforms a tree representing an expression
in SQL to an AST for Scala code to evaluate that expression, and then
compile and run the generated code. Quasiquotes are strongly typed which makes them significantly more useable than string concatenation because the outcome of quasi quotes is a Scala AST instead of running the Scala parser at runtime.

RDDS vs Dataframes vs Datasets

  • RDD: Its useful to go for RDDs when dealing with unstructured data not requiring strongly typed code, volume of data is not huge and performance is not a concern — RDDs are the slowest of all three. Adoption of RDDs is also driven by bias towards functional programming constructs over, say, domain driven design.
  • Dataframes or Row types Generic DataSets— Compared to RDDS, this one is more strongly typed but still generic enough to not mandate a full adherence to a schema. Domain constructs and a high level abstraction makes it easier to deal with data. Fastest among the three.
  • Datasets — This is best used when dealing with data from a fairly complex domain such as Finance or Telecom that has too many entities in the problem domain and complex business operations on these.

Executor sizing:

Based on executer “sizing” and certain configuration it is possible to influence the degree of task concurrency, hdfs throughput, gc overhead, data locality and memory utilisation.

Lets analyse the executer performance with an example. Let say you had 4 nodes, with 6 cores and 64 GB memory per node at your disposal then you could create executers in different combinations

Combination 1: (4 GB + 1 core) -> 16 * 4 = 64 executers
Combination 2: (8 GB + 2 cores) -> 8 * 4 = 32 executers
Combination 3: (16 GB + 4 cores) -> 4 * 4= 16 executers
Combination 4: (32 GB + 8 cores) -> 2 * 4 = 8 executers
Combination 5: (64 GB + 16cores) -> 1 * 4 = 4 executers

Combination 1 => Micro executers

Pros
* Good choice when we need a very large number of non memory intensive executers which are likely to finish fast
Cons
* Reduces degree of concurrency within the same JVM to 1
* Broadcast variables will be duplicated 16 times per node
* Hadoop processes will choke on resources
* Too many open connections (n²) on large clusters

Combination 5=> Fat executers

Pros
* Might be a good choice when we we are doing a highly memory intensive job such as a map side (broadcast) join of two files - one which is very large and the other small enough to fit in memory
* Excellent choice when we want minimally chatty cluster
Cons
* HDFS throughput goes down drastically because the HDFS client has trouble with tons of concurrent threads. At approx five tasks per executor we can achieve full write throughput.
* Results in excessive garbage collection delays.

Combination 3=> Balanced executers

* Enough memory for executers as well as Hadoop processes
* Best of both worlds: parallelism of a fat executor and best throughputs of a tiny executor.
* Less chatty communication pattern due to decrease in open connections.

Resource management

Outliers

Often times there will be “hotspotting” of executers wherein some of the executers will be sharing a far larger workload as compared to others. Here we can use spark dynamic resource allocation, which helps scales the number of executors registered with the application up and down based on the workload. The additional resources can be returned back once the job is done.

spark.dynamicAllocation.enabled = true
spark.shuffle.service.enabled = true
spark.dynamicAllocation.executorIdleTimeout
spark.dynamicAllocation.maxExecutors
spark.dynamicAllocation.minExecutors

Data Locality, dynamic and speculative execution

  • the combination of executers and cores per executer dictates degree of data locality; bigger the executers lesser the data movement over network
  • based on preferences spark can be forced to wait for CPUs cycles to be available rather than transfer data to an idle executer or vice versa

Memory management

  • Is the computation aggregation heavy ? Can in-memory caching fasten the transformation ? Memory allocated to executers and in memory storage can be tweaked to achieve better performance thorough optimal resource usage.
  • Important configs to tune: spark.executor.memory, spark.memory.fraction and spark.memory.storageFraction

Data Format

It suffices to say that spark works best with Parquet especially around following criteria ; I came across this blog which deep dives into how parquet integrates with spark. This is fairly involved subject in itself!

* Partition pruning
* Column projection
* Predicate/filter push-down
* Tools for investigating Parquet metadata
* Tools for measuring Spark metrics

As a reminder here is visual comparison matrix of various formats; mostly self explanatory I believe.

Partitioning

While spark is designed to take advantage of larger numbers of small partitions when dealing with lot of data such as, textfile methods, it can also be used in a way that allows for combining smaller files into a single partition if data is too fragmented, such as wholeTextFile. Based on partition behaviour of the operations applied on the data and the minimum partitions configured, following combinations are possible.

* partition-to-file blocks(1-many): when partition behaviour dictates that multiple small files be consumed by a single partition. This gives the fastest performance since it eliminates overhead that would have been created by an unnecessarily large number of partitions had* partition-to-files(1–1): when partition behaviour dictates that each file itself be given a separate partition. While this is more amenable to parallelism by means of more executers, it is also prone to data skew since there is high probability of data not being distributed uniformly across partitions.* partition-to-file blocks(many-1): when partition behaviour dictates that a large be broken down to enable parallelism then each chunk ends up as a partition. This is the slowest but least prone to data skew.

How do we deal with data skew that arises due to partition behaviour?
By the way, skew can also be introduced via shuffles, especially when joining datasets. Following mitigation strategies can be adopted

  • use salting in partitioning to introduce uniformity in data distribution
  • use coalesce or repartition based on the scenario
  • have separate grouping key or aggregation key
  • have a bucketing column to re-distribute original partition data amongst those buckets

Caching

RDDs face eviction in a LRU fashion during their life cycle whenever there is stress on memory. Therefore it makes sense to “ cache” data if we see following patterns in data processing

  • RDDs being referred in an iterative loop (ie. ML algos)
  • RDD being referred multiple times in a single application, job, or notebook.
  • The initial cost of RDD regeneration is expensive, especially after a complex transformation that involves multiple maps, filters as well as a lot of shuffles. This is especially true if a worker is prone to failure

Options for caching

  • Memory(Native Caching): This in memory based caching works well for small data sets.
  • Storage Caching: A stateless caching mechanism backed by some kind of fast access persistent storage. The first time access could be slow but with time subsequent access can deliver faster results. For e.g. we have external providers like Alluxeo, Ignite, etc which can be plugged into spark
  • Disk(HDFS based caching): This is cheap and fastest if SSDs are used; however it is stateful and data is lost if cluster brought down
  • Memory and disk: This is a hybrid of the first and the third approaches to make the best of both worlds.

Additionally, we can also serialise the data into a byte array and save space by incurring an extra cost of serialising-deserialising.

cache() or persist(StorageLevel.MEMORY_ONLY)
persist(StorageLevel.DISK_ONLY)
persist(StorageLevel.MEMORY_AND_DISK_ONLY)
etc, etc

Transformations

A small intro on various RDDS.

  • ShuffledRDD : ShuffledRDD is created while the data is shuffled over the cluster. If you use any transformation(e.g. join,groupBy,repartition, etc.) which shuffles your data it will create a shuffledRDD.
  • MapPartitionsRDD : MapPartitionsRDD will be created when you use mapPartition transformation.
  • ParallelCollectionRDD : ParallelCollectionRDD is created when you create the RDD with the collection object.
  • CartesianRDD: ParallelCollectionRDD generates a cartesian product of two RDDs. Each element of our first RDD is paired with each element of the second RDD. Therefore, if the cartesian operation is executed on an RDD of types X and an RDD of type Y it will return an RDD that will consist of <X,Y> pairs.

We know that our business logic will dictate the various RDD transformations we will write. Further more, the order in which the transformations are applied will dictate the volume of data shuffled over the network and amount of data processed at each processing stage. Hence careful considering of transformations is absolutely necessary to ensure that we maximise the throughput by avoiding unwanted shuffles and processing.

Joins

Cost Model is where Spark makes the decision on whether to do sort or not or what join to do. For e.g. if query plan indicates that after applying a filter one of the data sets is drastically reduced then its a good candidate for broadcast join. You can also advise spark by using broadcast hint autoBroadCastJoinThreshold based on size of the smaller data set. You can also enable cost based optimiser and also ask spark sql to analyse tables, enable join re ordering, etc.

Serialisation

Spark works well with Kyro serialisation which is far more efficient mechanism as compared to Java serialisation. However when it comes to closures in map functions, trying to serialise all complex references is inefficient; hence it makes sense to use outcomes instead.

For e.g. use rdd.map(_ + sum) instead of rdd.map(_ + a + b + c + d) 
where one or more of a,b,c and d might be complex classes. That way we can avoid transporting serialised copies of a,b,c,d over the network onto the executers which need them to perform the map operations

Of course all of this is in context of RDDs. When it comes to DataSets and DataFrames, due to Project Tungsten, they offer better and more optimised serialisation as they are aware of the actual data types they are working with and the context of computation. For some transformations may require only partial data to be serialised or deserialised e.g. counts or array lookups). This code generation step is a component of Project Tungsten which is a big part of what makes the high-level APIs so performant.

This concludes the post; in the next one we will try to do a deeper dive into some of the topics listed here. Hopefully this gives a good overview of the lay of the land.

--

--