
Updating a subset of a table using Spark and partitioning
Spark is a powerful engine for streaming and batch jobs.
At Anchormen, I use Databricks’ Spark notebooks for our Data Hub solution on Azure. Because we deal mostly with data which is huge in volume, we often have to to update our Azure Blob storage using only a subset of the data which was already written. This inspired me to share some tips and tricks for when you need to update a subset of an entire table. For this example, I will be using Spark’s native Scala API only.
So, let’s get started!
In order to update only a subset of a table, first thing you need to do is to partition your dataset, so that you can update only the individual partitions, instead of the whole table.
Suppose we have the following dataset:
ID | Date | Month | Visit |
---|---|---|---|
1 | 01.01.2020 | Jan | Amsterdam |
2 | 01.01.2020 | Jan | Rotterdam |
3 | 02.02.2020 | Feb | Rotterdam |
In your infinite wisdom, you foresee that you will probably have to update this table for a specific month, which means you will need to partition on the “Month” column.
The table structure in your storage should look like this:
-> root/ -> Month=Jan/ -> Month=Feb/
You can partition and write your table with the Spark’s Scala API as follows:
dataFrame .write .partitionBy("Month") .mode("errorifexists") // Other options are: "ignore", "append" and "overwrite" (Scala Doc) .parquet(s"$container@$account.blob.core.windows.net/")
Now, let’s assume that you want to update only the January entries:
ID | Date | Month | Visit |
---|---|---|---|
1 | 03.01.2020 | Jan | Amsterdam |
2 | 03.01.2020 | Jan | Rotterdam |
3 | 02.02.2020 | Feb | Rotterdam |
If we wanted the update the whole table (all the partitions) this would be fairly easy. To do this, re-run the above code with the updated dataFrame object and enable the “overwrite” mode.
But we want to update only the January partition because maybe we want to reduce our data write due to the cloud provider’s billing schema, or we simply can’t load the whole table in memory.
Since we only want to update the January entries, our new dataFrame object will look like the following:
ID | Date | Month | Visit |
---|---|---|---|
1 | 03.01.2020 | Jan | Amsterdam |
2 | 03.01.2020 | Jan | Rotterdam |
And because the new dataFrame object doesn’t have the February partition, we can’t update the table using the above code due to several reasons:
- Using the default “errorifexists” mode will interrupt the writing since there is already a table at that location.
- Using the “overwrite” method will overwrite the current table with the new table, losing the February partition.
- Using the “append” mode will add the new January entries to the table while keeping the old January entries.
- Using the “ignore” mode is similar to the default mode, but it does not raise an error when the table already exists, and will leave the old table as is.
As you can see, with the available methods, we can’t update a partition using only the new (subset) dataFrame object. In order to do that, we need to start from the beginning. Instead of explicitly partitioning the data, we will specify the partitioning column name with a path separator “/”.
val monthLiteral = month_to_be_written dataFrame .write //.partitionBy("Month") .mode("overwrite") .parquet(s"$container@$account.blob.core.windows.net/monthColumn=$monthLiteral")
However, this will force us to run the code once for each month (partition) since we are writing each month separately to the given path.
For our example, we will run it twice:
1. With the dataFrame object consisting of the January entries with the monthLiteral=”Jan”:
ID | Date | Month | Visit |
---|---|---|---|
1 | 01.01.2020 | Jan | Amsterdam |
2 | 01.01.2020 | Jan | Rotterdam |
2. With the dataFrame object consisting of the February entries with the monthLiteral=”Feb”:
ID | Date | Month | Visit |
---|---|---|---|
3 | 02.02.2020 | Feb | Rotterdam |
3. After these two write operations, the final table will look like the following:
ID | Date | Month | monthColumn | Visit |
---|---|---|---|---|
1 | 01.01.2020 | Jan | Jan | Amsterdam |
2 | 01.01.2020 | Jan | Jan | Rotterdam |
3 | 02.02.2020 | Feb | Feb | Rotterdam |
We see two month columns: “Month” and “monthColumn”, because parquet treats each path separator “/” as a partition. You can either drop the “Month” column from the dataFrame objects before the write operations or the “monthColumn” from the table after the write operations. Totally up to you.
For now, we will continue keeping both columns.
4. Now, we want to update only the January entries. We will re-run the second code with the monthLiteral=”Jan”, using the updated dataFrame object with the new January entries:
ID | Date | Month | Visit |
---|---|---|---|
1 | 03.01.2020 | Jan | Amsterdam |
2 | 03.01.2020 | Jan | Rotterdam |
5. Our table, which is updated on the January partition using only the January entries, will look like this:
ID | Date | Month | monthColumn | Visit |
---|---|---|---|---|
1 | 03.01.2020 | Jan | Jan | Amsterdam |
2 | 03.01.2020 | Jan | Jan | Rotterdam |
3 | 02.02.2020 | Feb | Feb | Rotterdam |
And that’s it!
I worked only with Spark’s native API’s due to a project limitation (the code was all written in native) but if you have more freedom, there are other ways of reading/writing data with Spark that are worth investigating such as Spark’s Hive API (offers various read/write operations for Hive tables) and Delta Lake storage layer (offers ACID transactions for Spark and also Databricks). I hope you found this short guide helpful!