spark parallelization

Regain control of your Spark Parallelization

SparkSQL singlepartion optimization

One of my latest endeavors at Anchormen was optimizing a long SparkSQL query that performed several calculations over a large dataset. With the amount of data being handled in the multi-node cluster, the client wasn’t expecting such a long processing time. After digging into it, we discovered that one specific column calculation was at the root of the problem, and it could easily be solved, leading to more than 75% gain in speed!

If you also want to be able to diagnose and solve such problems, then this blog is just for you.

The Single Partition Problem

When calculating aggregations over a dataset, one can use Window functions to scope the aggregations. Let’s see a short example using the Scala API of Spark (I’m omitting the imports for simplicity’s sake):

val numbersDf = (1 to 10).toList.toDF("number")
// Calculating “double” and “sum” columns
numbersDf.withColumn("double", col("number") * 2)
         .withColumn("sum", sum(col("number")).over(Window.orderBy("number")))

     |     1|     2|  1|
     |     2|     4|  3|
     |     3|     6|  6|
     |     4|     8| 10|
     |     5|    10| 15|
     |     6|    12| 21|
     |     7|    14| 28|
     |     8|    16| 36|
     |     9|    18| 45|
     |    10|    20| 55|

Different from a regular SQL sum aggregation (where a groupBy clause would be needed). The sum function is delivering a different result for each row in the dataset. Also, as it is using an ordered window, we can clearly see that the calculation of one row value depends on the calculation of all previous values.

What this dependency means in terms of parallelization? It means that we cannot do it! And Spark can even warn us about it. Let’s see its query plan:

> numbersDf.withColumn("double", col("number") * 2)
           .withColumn("sum", sum(col("number")).over(Window.orderBy("number")))

20/03/17 15:27:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

== Physical Plan ==
Window [sum(cast(number#3 as bigint)) windowspecdefinition(…) AS sum#40L],
       [number#3 ASC NULLS FIRST]
+- *(1) Sort [number#3 ASC NULLS FIRST], false, 0
   +- Exchange SinglePartition
      +- LocalTableScan [number#3, double#36]

The warning message, which appears in the Scala console or in the Spark logs, already gives us a clue of what is happening: we’re serializing the window calculation in one data partition, i.e. executing it in serial. One option is to avoid this computation completely, but this is entirely dependent on the problem’s domain. In this example, we assume we must calculate this sum column.

The Physical Plan is also showing the serialization happening. Reading it from bottom to top, Spark will first do the LocalTableScan already including the columns number and double. Then, it will force an Exchange SinglePartition step to be able to Sort the number column and apply the sum Window.

Until now, the SinglePartition is this unavoidable step that Spark will do at the end. But what happens if there are more columns being calculated after that? Let’s add another one and see the effects:

// Adding a “double_sum” column after the windowing
numbersDf.withColumn("double", col("number") * 2)
         .withColumn("sum", sum(col("number")).over(Window.orderBy("number")))
         .withColumn("double_sum", col("sum") * 2)

== Physical Plan ==
*(2) Project [number#3, double#64, sum#68L, (sum#68L * 2) AS double_sum#72L]
+- Window [sum(cast(number#3 as bigint)) windowspecdefinition(…) AS sum#68L],
          [number#3 ASC NULLS FIRST]
   +- *(1) Sort [number#3 ASC NULLS FIRST], false, 0
      +- Exchange SinglePartition
         +- LocalTableScan [number#3, double#64]

The Physical Plan is still very similar to the first one, with the added Project step to calculate the double_sum column. But wait a minute, we just saw how Spark does an Exchange to SinglePartition before that, which means that double_sum and every subsequent calculation will also be done in serial! Spark optimizations can be very cleaver depending on the query, but you still have to keep an eye on this effect because in this case, regardless of how many nodes you have in your cluster, they will all be waiting for one of them to do the whole work alone.

In this simple example the sequential computation effects are not noticeable, but with our client query plan that included many more calculations over gigabytes of data, this was surely slowing things down. But how do we tell Spark that after the Window function is applied it can parallelize the process again?

Visualizing the problem with SparkUI

Before discussing how to solve this, let’s have a look at the SparkUI and how our stages are laid out for our last example. Knowing how to quickly diagnose this problem in SparkUI can be very valuable, especially because many environments expose it out-of-the-box.

details for job 2

The first Stage (id 4) has 8 tasks (shown at the bottom right of the screenshot), and the second Stage (id 5) has only the serialized one. Also, the DAG (Directed Acyclic Graph) visualization shows that only 2 steps are performed in the first stage (the 2 blue blocks) while many more steps are processed in the second stage. Since the later stage is the one executed in serial, we might not be using all our computing power here.

In larger queries, the DAG might contain many more blocks and this analysis becomes visually more complicated. Notice that we can focus our attention in the Exchange blocks; the ones that connect stages. Another tool we can use to investigate where, and if, a serialization of the process is happening is looking at the SQL view in SparkUI associated with this job:


details for query 2

In this view we see more details of each block of the query, and the Exchange block appears only once (instead of one per stage). By hovering the mouse over the Exchange block, it reveals that it’s a SinglePartition from there on, and every downstream calculation will also be done in this same partition in one node.

Repartition to the rescue

Now that we have a clear picture of the problem, we already know what we must do: tell Spark to repartition the dataset in order to parallelize the computation again:

// Repartitioning after the window calculation
numbersDf.withColumn("double", col("number") * 2)
         .withColumn("sum", sum(col("number")).over(Window.orderBy("number")))
         .withColumn("double_sum", col("sum") * 2)

== Physical Plan ==
*(2) Project [number#3, double#174, sum#178L, (sum#178L * 2) AS double_sum#182L]
+- Exchange RoundRobinPartitioning(4)
   +- Window [sum(cast(number#3 as bigint)) windowspecdefinition(…) AS sum#178L],
             [number#3 ASC NULLS FIRST]
      +- *(1) Sort [number#3 ASC NULLS FIRST], false, 0
         +- Exchange SinglePartition
            +- LocalTableScan [number#3, double#174]

And voilà! The plan has an extra Exchange step after which the calculation of double_sum is performed. The repartitioning chosen here is one that explicitly sets the number of partitions to 4, but we could have also chosen another strategy, like repartitioning by some column value. Check the documentation for other examples.

To finalize our analysis, let’s look how the DAG looks like in SparkUI:

details for job 8

As expected, we now have 2 exchange transitions and 3 stages. The latest stage is running in parallel with 3 tasks, instead of the 4 partitions chosen. This is probably due to the small amount of data in the example and Spark might have decided that only 3 tasks are enough.

details for query 4

The SQL query detail view is even clearer, with the sorting and windowing happening in one SinglePartition stage followed by the last Project step (double_sum computation) done in parallel due to the RoundRobinPartitioning.

We have seen how a window aggregation can generate single partitions in the query plan. We have also seen where to look for them and how to instruct Spark to repartition the dataset after the single partition step.

All the functionality shown in the Scala API is also available in PySpark and SparkSQL (Hive QL) API. Now you know how to make use of them to regain control of parallelization in your Spark jobs. Hope you found this article useful!

Like this article and want to stay updated of more news and events?
Then sign up for our newsletter!

Don't miss out!

Subscribe to our newsletter and stay up to date with our latest articles and events!

Subscribe now

Newsletter Subscription