Advanced performance enhancement techniques in Spark.

Design choices:

Language choice

This impossible to answer and highly depends on your requirement. If you want to perform some single-node machine learning after performing a large ETL job, we might recommend running your Extract, Transform, and Load (ETL) code as SparkR code and then using R’s massive machine earning ecosystem to run your single-node machine learning algorithms. This gives you the best of both worlds and takes advantage of the strength of R as well as the strength of Spark without sacrifices. When you need to include custom transformations that cannot be created in the Structured APIs,If you’re going to do this, R and Python are not necessarily the best choice simply because of how this is actually executed. We find that using Python for the majority of the application, and porting some of it to Scala or writing specific UDFs in Scala as your application evolves, is a powerful technique.

DataFrames versus SQL versus Datasets versus RDDs

The answer is simple. Across all languages, DataFrames, Datasets, and SQL are equivalent in speed.However, if you’re going to be defining UDFs, you’ll take a performance hit writing those in Python or R, and to some extent a lesser performance hit in Java and Scala. Lastly, if you want to use RDDs, we definitely recommend using Scala or Java.

 

Object Serialization in RDDs:

When you’re working with custom data types, you’re going to want to serialize them using Kryo because it’s both more compact and much more efficient than Java serialization. However, this does come at the inconvenience of registering the classes that you will be using in your application. You can use Kryo serialization by setting spark.serializer to org.apache.spark.serializer.KryoSerializer. You will also need to explicitly register the classes that you would like to register with the Kryo serializer via the spark.kryo.classesToRegister configuration. To register your classes, use the SparkConf that you just created and pass in the names of your
classes:

conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

 

Cluster Configurations:

Cluster/application sizing

You can refer tow below post from this website to get clear idea on allocating resources to a Spark job:

http://www.bigdatainterview.com/how-allocate-resources-for-a-spark-job-or-resources-allocation-in-spark/

Dynamic memory allocation

Spark provides a mechanism to dynamically adjust the resources your application occupies based on the workload. This means that your application can give resources back to the cluster if they are no longer used, and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster. This feature is disabled by default and available on all coarse-grained cluster managers; that is, standalone mode, YARN mode, and Mesos coarse-grained mode. If you’d like to enable this feature, you should set spark.dynamicAllocation.enabled to true.

Click here to know more about Dynamic Resources allocation in Spark

 

Scheduling:

Scheduling optimizations do involve some research and experimentation, and unfortunately there are not super-quick fixes beyond setting spark.scheduler.mode to FAIR to allow better sharing of resources across multiple users, or setting --max-executor-cores, which specifies the maximum number of executor cores that your application will need. Specifying this value can ensure that your application does not take up all the resources on the cluster. You can also change the default, depending on your cluster manager, by setting the configuration spark.cores.max to a default of your choice.

 

Data at Rest:

This to save the data in an effective manner so that next time the saved data can be read and processed efficiently. This process involves choosing the storage system, file format, compression algorithm, partitioning and bucketing.

File-based long-term data storage

Generally you should always favor structured, binary types to store your data, especially when you’ll be accessing it frequently. The most efficient file format you can generally choose is Apache Parquet. Parquet stores data in binary files with column-oriented storage, and also tracks some statistics about each file that make it possible to quickly skip data not needed for a query. It is well integrated with Spark through the built-in Parquet data source.

Splittable file types and compression

Whatever file format you choose, you should make sure it is “splittable”, which means that different tasks can read different parts of the file in parallel so that we can greatly take advantage of parallelism while processing data.

Splittable compression algorithms: gzip, bzip2, lz4

Non-splittable: ZIP, TAR

Table partitioning

Storage managers like Apache Hive support this concept, as do many of Spark’s built-in data sources. Partitioning your data correctly allows Spark to skip many irrelevant files when it only requires data with a specific range of keys. For instance, if users frequently filter by “date” or “customerId” in their queries, partition your data by those columns.

Bucketing

The essence is that bucketing your data allows Spark to “pre-partition” data according to how joins or aggregations are likely to be performed by readers. This can improve performance and stability because data can be
consistently distributed across partitions as opposed to skewed into just one or two. For instance, if joins are frequently performed on a column immediately after a read, you can use bucketing to ensure that the data is well partitioned according to those values. This can help prevent a shuffle before a join and therefore help speed up data access.

The number of files

If there are lots of small files, you’re going to pay a price listing and fetching each of those individual files. Having lots of small files is going to make the scheduler work much
harder to locate the data and launch all of the read tasks. This can increase the network and scheduling overhead of the job. To control how many records go into each file, you can specify the maxRecordsPerFile option to the write operation.

 

Parallelism

The first thing you should do whenever trying to speed up a specific stage is to increase the degree of parallelism. In general, we recommend having at least two or three tasks per CPU core in your cluster if the stage processes a large amount of data. You can set this via the spark.default.parallelism property as well as tuning the spark.sql.shuffle.partitions according to the number of cores in your cluster.

 

Improved Filtering

Another frequent source of performance enhancements is moving filters to the earliest part of your Spark job that you can. Sometimes, these filters can be pushed into the data sources themselves and this means that you can avoid reading and working with data that is irrelevant to your end result. Enabling partitioning and bucketing also helps achieve this.

 

Repartitioning and Coalescing

If you’re reducing the number of overall partitions in a DataFrame or RDD, first try coalesce method, which will not perform a shuffle but rather merge partitions on the same node into one partition. The slower repartition method will also shuffle data across the network to achieve even load balancing. Repartitions can be particularly helpful when performing joins or prior to a cache call.

 

User-Defined Functions (UDFs)

In general, avoiding UDFs is a good optimization opportunity. UDFs are expensive because they force representing data as objects in the JVM and sometimes do this multiple times per record in a query.

 

Temporary Data Storage (Caching)

In applications that reuse the same datasets over and over, one of the most useful optimizations is caching. Caching will place a DataFrame, table, or RDD into temporary storage (either memory or disk) across the executors in your cluster, and make subsequent reads faster.

 

Joins

Equi-joins are the easiest for Spark to optimize at this point and therefore should be preferred wherever possible. Beyond that, simple things like trying to use the filtering ability of inner joins by changing join ordering can yield large speedups. Additionally, using broadcast join hints can help Spark make intelligent planning decisions when it comes to creating query plans. Avoiding Cartesian joins or even full outer joins is often low-hanging fruit for stability and optimizations.

 

Aggregations

If you’re using RDDs, controlling exactly how these aggregations are performed (e.g., using reduceByKey when possible over groupByKey) can be very helpful and improve the speed and stability of your code.

 

Broadcast Variables

The basic premise is that if some large piece of data will be used across multiple UDF calls in your program, you can broadcast it to save just a single read-only copy on each node and avoid re-sending this data with each job.

Leave a Reply

Your email address will not be published. Required fields are marked *