# Regain control of your Spark Parallelization

## SparkSQL singlepartion optimization

One of my latest endeavors at Anchormen was optimizing a long SparkSQL query that performed several calculations over a large dataset. With the amount of data being handled in the multi-node cluster, the client wasn’t expecting such a long processing time. After digging into it, we discovered that one specific column calculation was at the root of the problem, and it could easily be solved, leading to more than 75% gain in speed!

If you also want to be able to diagnose and solve such problems, then this blog is just for you.

### The Single Partition Problem

When calculating aggregations over a dataset, one can use Window functions to scope the aggregations. Let’s see a short example using the Scala API of Spark (I’m omitting the imports for simplicity’s sake):

val numbersDf = (1to10).toList.toDF("number")// Calculating “double” and “sum” columnsnumbersDf.withColumn("double", col("number") * 2) .withColumn("sum", sum(col("number")).over(Window.orderBy("number"))) .show +------+------+---+ |number|double|sum| +------+------+---+ | 1| 2| 1| | 2| 4| 3| | 3| 6| 6| | 4| 8| 10| | 5| 10| 15| | 6| 12| 21| | 7| 14| 28| | 8| 16| 36| | 9| 18| 45| | 10| 20| 55| +------+------+---+

Different from a regular SQL **sum** **aggregation** (where a *groupBy* clause would be needed). The **sum** function is delivering a different result for each row in the dataset. Also, as it is using an ordered window, we can clearly see that the calculation of one row value depends on the calculation of all previous values.

What this dependency means in terms of parallelization? It means that we cannot do it! And Spark can even warn us about it. Let’s see its query plan:

> numbersDf.withColumn("double", col("number") * 2) .withColumn("sum", sum(col("number")).over(Window.orderBy("number"))) .explain 20/03/17 15:27:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. == Physical Plan == Window [sum(cast(number#3 as bigint)) windowspecdefinition(…) AS sum#40L], [number#3 ASC NULLS FIRST] +- *(1) Sort [number#3 ASC NULLS FIRST], false, 0 +- Exchange SinglePartition +- LocalTableScan [number#3, double#36]

The warning message, which appears in the Scala console or in the Spark logs, already gives us a clue of what is happening: we’re serializing the window calculation in one data partition, i.e. executing it in serial. One option is to avoid this computation completely, but this is entirely dependent on the problem’s domain. In this example, we assume we must calculate this **sum** column.

The Physical Plan is also showing the serialization happening. Reading it from bottom to top, Spark will first do the **LocalTableScan** already including the columns number and double. Then, it will force an **Exchange SinglePartition** step to be able to **Sort** the number column and apply the sum **Window**.

Until now, the **SinglePartition** is this unavoidable step that Spark will do at the end. But what happens if there are more columns being calculated after that? Let’s add another one and see the effects:

// Adding a “double_sum” column after the windowingnumbersDf.withColumn("double", col("number") * 2) .withColumn("sum", sum(col("number")).over(Window.orderBy("number"))) .withColumn("double_sum", col("sum") * 2) .explain == Physical Plan == *(2) Project [number#3, double#64, sum#68L, (sum#68L * 2) AS double_sum#72L] +- Window [sum(cast(number#3 as bigint)) windowspecdefinition(…) AS sum#68L], [number#3 ASC NULLS FIRST] +- *(1) Sort [number#3 ASC NULLS FIRST], false, 0 +- Exchange SinglePartition +- LocalTableScan [number#3, double#64]

The Physical Plan is still very similar to the first one, with the added **Project** step to calculate the **double_sum** column. But wait a minute, we just saw how Spark does an **Exchange** to **SinglePartition** before that, which means that **double_sum** and every subsequent calculation will also be done in serial! Spark optimizations can be very cleaver depending on the query, but you still have to keep an eye on this effect because in this case, regardless of how many nodes you have in your cluster, they will all be waiting for one of them to do the whole work alone.

In this simple example the sequential computation effects are not noticeable, but with our client query plan that included many more calculations over gigabytes of data, this was surely slowing things down. But how do we tell Spark that after the **Window** function is applied it can parallelize the process again?

### Visualizing the problem with SparkUI

Before discussing how to solve this, let’s have a look at the **SparkUI** and how our stages are laid out for our last example. Knowing how to quickly diagnose this problem in **SparkUI** can be very valuable, especially because many environments expose it out-of-the-box.

The first Stage (id 4) has 8 tasks (shown at the bottom right of the screenshot), and the second Stage (id 5) has only the serialized one. Also, the **DAG** (Directed Acyclic Graph) visualization shows that only 2 steps are performed in the first stage (the 2 blue blocks) while many more steps are processed in the second stage. Since the later stage is the one executed in serial, we might not be using all our computing power here.

In larger queries, the **DAG** might contain many more blocks and this analysis becomes visually more complicated. Notice that we can focus our attention in the **Exchange** blocks; the ones that connect stages. Another tool we can use to investigate where, and if, a serialization of the process is happening is looking at the SQL view in **SparkUI** associated with this job:

In this view we see more details of each block of the query, and the **Exchange** block appears only once (instead of one per stage). By hovering the mouse over the **Exchange** block, it reveals that it’s a **SinglePartition** from there on, and every downstream calculation will also be done in this same partition in one node.

### Repartition to the rescue

Now that we have a clear picture of the problem, we already know what we must do: tell Spark to repartition the dataset in order to parallelize the computation again:

// Repartitioning after the window calculationnumbersDf.withColumn("double", col("number") * 2) .withColumn("sum", sum(col("number")).over(Window.orderBy("number"))) .repartition(4) .withColumn("double_sum", col("sum") * 2) .explain == Physical Plan == *(2) Project [number#3, double#174, sum#178L, (sum#178L * 2) AS double_sum#182L] +- Exchange RoundRobinPartitioning(4) +- Window [sum(cast(number#3 as bigint)) windowspecdefinition(…) AS sum#178L], [number#3 ASC NULLS FIRST] +- *(1) Sort [number#3 ASC NULLS FIRST], false, 0 +- Exchange SinglePartition +- LocalTableScan [number#3, double#174]

And voilà! The plan has an extra **Exchange** step after which the calculation of **double_sum** is performed. The repartitioning chosen here is one that explicitly sets the number of partitions to 4, but we could have also chosen another strategy, like repartitioning by some column value. Check the documentation for other examples.

To finalize our analysis, let’s look how the **DAG** looks like in **SparkUI**:

As expected, we now have 2 exchange transitions and 3 stages. The latest stage is running in parallel with 3 tasks, instead of the 4 partitions chosen. This is probably due to the small amount of data in the example and Spark might have decided that only 3 tasks are enough.

The SQL query detail view is even clearer, with the sorting and windowing happening in one **SinglePartition** stage followed by the last **Project** step (**double_sum** computation) done in parallel due to the **RoundRobinPartitioning**.

We have seen how a window aggregation can generate single partitions in the query plan. We have also seen where to look for them and how to instruct Spark to repartition the dataset after the single partition step.

All the functionality shown in the Scala API is also available in PySpark and SparkSQL (Hive QL) API. Now you know how to make use of them to regain control of parallelization in your Spark jobs. Hope you found this article useful!