spark cluster

Updating a subset of a table using Spark and partitioning

Spark is a powerful engine for streaming and batch jobs.

At Anchormen, I use Databricks’ Spark notebooks for our Data Hub solution on Azure. Because we deal mostly with data which is huge in volume, we often have to to update our Azure Blob storage using only a subset of the data which was already written. This inspired me to share some tips and tricks for when you need to update a subset of an entire table. For this example, I will be using Spark’s native Scala API only.

So, let’s get started!

In order to update only a subset of a table, first thing you need to do is to partition your dataset, so that you can update only the individual partitions, instead of the whole table.

Suppose we have the following dataset:

ID Date Month Visit
1 01.01.2020 Jan Amsterdam
2 01.01.2020 Jan Rotterdam
3 02.02.2020 Feb Rotterdam

In your infinite wisdom, you foresee that you will probably have to update this table for a specific month, which means you will need to partition on the “Month” column.

The table structure in your storage should look like this:

-> root/
     -> Month=Jan/
     -> Month=Feb/

You can partition and write your table with the Spark’s Scala API as follows:

dataFrame
     .write
     .partitionBy("Month")
     .mode("errorifexists") // Other options are: "ignore", "append" and "overwrite" (Scala Doc)
     .parquet(s"$container@$account.blob.core.windows.net/")

Now, let’s assume that you want to update only the January entries:

ID Date Month Visit
1 03.01.2020 Jan Amsterdam
2 03.01.2020 Jan Rotterdam
3 02.02.2020 Feb Rotterdam

If we wanted the update the whole table (all the partitions) this would be fairly easy. To do this, re-run the above code with the updated dataFrame object and enable the “overwrite” mode.

But we want to update only the January partition because maybe we want to reduce our data write due to the cloud provider’s billing schema, or we simply can’t load the whole table in memory.

Since we only want to update the January entries, our new dataFrame object will look like the following:

ID Date Month Visit
1 03.01.2020 Jan Amsterdam
2 03.01.2020 Jan Rotterdam

And because the new dataFrame object doesn’t have the February partition, we can’t update the table using the above code due to several reasons:

  • Using the default “errorifexists” mode will interrupt the writing since there is already a table at that location.
  • Using the “overwrite” method will overwrite the current table with the new table, losing the February partition.
  • Using the “append” mode will add the new January entries to the table while keeping the old January entries.
  • Using the “ignore” mode is similar to the default mode, but it does not raise an error when the table already exists, and will leave the old table as is.

As you can see, with the available methods, we can’t update a partition using only the new (subset) dataFrame object. In order to do that, we need to start from the beginning. Instead of explicitly partitioning the data, we will specify the partitioning column name with a path separator “/”.

val monthLiteral = month_to_be_written
dataFrame
     .write
     //.partitionBy("Month")
     .mode("overwrite")
     .parquet(s"$container@$account.blob.core.windows.net/monthColumn=$monthLiteral")

However, this will force us to run the code once for each month (partition) since we are writing each month separately to the given path.

For our example, we will run it twice:

1. With the dataFrame object consisting of the January entries with the monthLiteral=”Jan”:

ID Date Month Visit
1 01.01.2020 Jan Amsterdam
2 01.01.2020 Jan Rotterdam

2. With the dataFrame object consisting of the February entries with the monthLiteral=”Feb”:

ID Date Month Visit
3 02.02.2020 Feb Rotterdam

3. After these two write operations, the final table will look like the following:

ID Date Month monthColumn Visit
1 01.01.2020 Jan Jan Amsterdam
2 01.01.2020 Jan Jan Rotterdam
3 02.02.2020 Feb Feb Rotterdam

We see two month columns: “Month” and “monthColumn”, because parquet treats each path separator “/” as a partition. You can either drop the “Month” column from the dataFrame objects before the write operations or the “monthColumn” from the table after the write operations. Totally up to you.

For now, we will continue keeping both columns.

4. Now, we want to update only the January entries. We will re-run the second code with the monthLiteral=”Jan”, using the updated dataFrame object with the new January entries:

ID Date Month Visit
1 03.01.2020 Jan Amsterdam
2 03.01.2020 Jan Rotterdam

5. Our table, which is updated on the January partition using only the January entries, will look like this:

ID Date Month monthColumn Visit
1 03.01.2020 Jan Jan Amsterdam
2 03.01.2020 Jan Jan Rotterdam
3 02.02.2020 Feb Feb Rotterdam

And that’s it!

I worked only with Spark’s native API’s due to a project limitation (the code was all written in native) but if you have more freedom, there are other ways of reading/writing data with Spark that are worth investigating such as Spark’s Hive API (offers various read/write operations for Hive tables) and Delta Lake storage layer (offers ACID transactions for Spark and also Databricks). I hope you found this short guide helpful!

 

Like this article and want to stay updated of more news and events?
Then sign up for our newsletter!

Don't miss out!

Subscribe to our newsletter and stay up to date with our latest articles and events!

Subscribe now

* These fields are required.