azure data hub

The Power of an Azure Data Hub

For a recent data project, we received the request to load 90 tables from an SQL server into an Azure Blob storage, with the idea of having new data added daily. In total, the tables contained around 640 million records and were 100GB in size. Obviously, we could have built 90 separate pipelines, but this inefficient method would have taken several weeks of development, and maintenance would have been a nightmare. Instead, we decided to build a single Azure Data Factory pipeline to load all the tables in one go, which significantly reduced the workload. In the end, it took us less than a week to develop, test, and deploy everything!

We figured this is such an interesting use case for the Anchormen Data Hub, that we’d just walk you through the whole process so you can see first-hand the power of activating your data. For this project the client was working with Microsoft Azure, but it goes without saying that something similar can be built with AWS or GCP.

With all that being said, let’s get down to the real work.

Overview

As you can imagine, this is not a new problem and a little research leads you to an excellent blog post by Michał Pawlikowski on the subject. He gives a step-by-step approach to loading data from two source databases into a target database. What we did was a variation of his method tailored to our specific needs.

A high-level overview of our solution would look something like this:

  1. A load_state table that holds all the necessary information about the tables that will be loaded. Most importantly, it keeps track of the modified date when the table was last loaded. This is vital to avoid duplicate data loading. New source tables that need to be loaded can simply be added to the load_state.
  2. Set a global maximum modified timestamp for all tables that will be loaded. For all tables we get records modified between the last time the data was loaded and this maximum timestamp. This ensures we get a consistent ‘snapshot’ from the database up to the maximum timestamp. We do omit records being added or updated during the load, but we can live with that.
  3. An Azure Data Factory pipeline with a Lookup activity followed by a ForEach The Lookup simply gets all the information from the load_state table and the ForEach activity executes a pipeline that loads all the data that has changed since the last load. The load on the SQL Server can be managed by specifying the parallelism of the ForEach activity. For our initial load we set this activity to “sequential” to avoid overloading the database. We increase parallelism for subsequent incremental loads to speed up the overall process.
  4. The actual data loading is done in batches between 1 million and 2.5 million records. This helps us avoid memory errors in the Integration Runtime and result in parquet files between 50MB and 150MB.

Once a source table has loaded successfully the ‘modified on’ field for the table is updated with the timestamp used in the query. This way we keep track what data from a table we have already loaded. The next run will only select data bigger than this timestamp.

Pipeline load_state

The cornerstone of the solution is a small SQL table I refer to in this post as ‘load_state’. It contains all tables that have to be loaded along with the metadata required to load a table the right way. These are the fields used:

  1. id: the technical ID of the record as an auto-incremented integer
  2. source_database: the database name
  3. source_table: the table name within source_database that needs to be loaded
  4. modified_column: the name of the column within the source_table that holds the TIMESTAMP on which the record was last modified.
  5. modified_value: the TIMESTAMP up to which the load got all the records.
  6. projection: the column definition to load. By default, this is ‘*’ so it simply gets all columns from source_table. But it is also possible to transform or ignore columns. Within our pipeline this is used to exclude all Personally Identifiable Information (PII) before loading the data into the Azure.
  7. filter: an optional filter that can be used to exclude specific records from the load. For example, you might want to exclude records from certain countries.
  8. batch_size: the number of records to fetch and store in a single Parquet file. By default, this is 2.5 million which results in an average parquet file size of 75MB. Wider tables typically have less rows per file while narrow tables have more. Makes sense, right?
  9. Version: an integer indicating the version of the table and is used to handle schema changes in the source table. The version is part of the path in which the loaded data is stored. This way, different table versions can be stored in different paths.
  10. enabled: a Boolean indicating if the table needs to be loaded. This flag can easily be used to exclude/include individual tables from the loading process. For example, disabling a table if there is an issue with it.
  11. priority: integer value indicating the priority of the table. This is used to determine load order; tables with highest priority are loaded first. This can be helpful if certain tables have extensive processes on top of them, so they need to be loaded a.s.a.p. in order to meet SLA’s

This is the create statement used:

CREATE TABLE load_state (
id INT IDENTITY(1,1) PRIMARY KEY,
source_database varchar(50) NOT NULL,
source_table varchar(50) NOT NULL,
modified_column varchar(50) NOT NULL DEFAULT 'modified_on',
modified_value varchar(20) NOT NULL DEFAULT '1900-01-01T00:00:00',
priority INT DEFAULT 0 NOT NULL,
projection VARCHAR(5000) NOT NULL DEFAULT '*',
filter VARCHAR(5000) NOT NULL DEFAULT '1=1',
version INT NOT NULL DEFAULT 0,
batch_size INT NOT NULL DEFAULT 2500000,
enabled BIT DEFAULT 1 NOT NULL
)

When loaded, this looks like the figure below (ordered by priority). It shows different priorities, projection, and batch_size values for individual tables which we use to control what and how a table is loaded.

This table is stored on an Azure Serverless SQL Database. This is a new compute Tier for SQL databases currently in Preview. It automatically scales on workload and pauses the database when there is no load. This makes it the most cost-efficient solution for such a table which is only accessed a couple of hours a day (at least in our case) and can be paused most of the time. We are only billed for storage of the table and two to three hours of compute during the runtime of the pipeline. For such a small table it takes around 30 seconds to become operational after it was paused which is reasonable for our data pipeline.

Next, we defined a VIEW on top of this table so that we can only get enabled tables ordered by priority. This view is used by the Lookup activity in Azure Data Factory, so it only gets enabled tables to load and starts with the highest priority ones. The view is defined as:

CREATE VIEW [load_state_prio] AS
SELECT TOP 1000 * FROM [load_state] WHERE enabled = 1 ORDER BY priority DESC

The last part of the load state is a Stored Procedure used to update the modified_value after a single table has been loaded successfully. This was taken directly from Michał’s blog, credit where credit is due!

GO
CREATE PROCEDURE [dbo].[update_modified_value]

@source_database VARCHAR(50),

@source_table VARCHAR(50),

@new_modified_value VARCHAR(20)

AS
SET NOCOUNT ON;

UPDATE dbo.load_state
SET modified_value = @new_modified_value
WHERE source_database = @source_database AND source_table = @source_table;

GO

Data Factory Pipeline 1: lookup and iterate

With the load_state table in place, it is time to look at the first Data Factory pipeline and associated Datasets. It is a very simple pipeline which sets some global variables, gets all tables to load from the load_state_prio VIEW and iterates over these to load them using another pipeline. A screenshot of this pipeline is shown below.

It contains the following four activities:

  1. set_max_modified_on: this activity sets the max_modified_on variable used to query data from all tables. This ensures that we get a fixed database state for which all records have the same maximum modified_on It differs from Michał’s approach which gets the current maximum value for each table at runtime. Using such an approach can create the situation that records for one table are loaded but reference records in another table not because it was loaded before these changes were made. We use a global maximum timestamp to avoid this issue. The downside of it is that we do not load records that are there when we read the source table (we get those next time).
  2. set_load_date: this activity sets the load_date variable to a “yyyyMMddHHmm” format. The records of all the tables will be written into directories named after this load date. This way, we can track how each load ended up in blob storage. Theactual blob storage path is:
    /load_date<yyyyMMddHHmm>/<databasename>/<tablename>/version=<version>/parquet files
  3. get_conf: gets all tables to load from the load_state_prio This results in a list with tables to be loaded ordered by priority.

per_table: this executes a pipeline that actually loads an individual table. This pipeline gets all parameters from the load_state required to load the table (i.e. the source_table, projection, batch_size, etc.). The internals of this pipeline are described in the next chapter.

Data Factory Pipeline 2: copy and update state

This pipeline is responsible for copying the data from a single table to a blob and is highly parameterized. It gets all parameters, like the database, table, columns, batch_size from the ForEach activity described above. The actual batchwise copy is done in a third pipeline which is described in the next section. If the copy was successful, the load_state table is updated using the stored procedure. The actual configuration of the Stored Procedure activity is shown below.

Possible improvement

It is possible that a first batch copy succeeds while a later one fails. In that scenario we have loaded the data partially which is not good. That scenario can be fixed in this pipeline by adding some cleanup when the batch_copy_to_integration fails. This cleanup removes the partially loaded data from Blob storage so we end up with a proper state in which nothing got loaded for a specific table. The load_state does not get updated so the next run will get properly get the missing data.

Data Factory Pipeline 3: batchwise loading

This is where the actual data copying happens. The concept is simply to copy a batch of data from the source table until the number of copied records is smaller than the specified batch size. The Data Factory implementation is an Until activity with the following expression:

@less(activity('copy_batch').output.rowsCopied,
int(pipeline().parameters.batch_size))

This simply configures the Data Factory to keep on executing the Until activities until the number of rows copied is smaller than the specified batch size. In each iteration the pipeline below is executed, which copies a batch and increases the offset in order to copy the next batch (if needed). Two variables are used to increase the offset of each copy.

 

As shown in this figure the actual implementation has three activities:

  1. copy_batch: this is a Copy Data activity which executes a query and stores the result in blob storage. The information from the load_state is used to create the following query:
SELECT @{pipeline().parameters.columns}
FROM [@{pipeline().parameters.database}].[@{pipeline().parameters.table}]
WHERE @{pipeline().parameters.modified_column} > '@{pipeline().parameters.latest_modified}'
AND      @{pipeline().parameters.modified_column} <= '@{pipeline().parameters.max_modified}'
AND (@{pipeline().parameters.filter} )
ORDER BY id ASC offset @{variables('offset')} ROWS FETCH NEXT @{pipeline().parameters.batch_size}
ROWS ONLY;

A single query gets several records from the source table that match the modified criteria and filter. An offset variable is used to control which batch of the result will be copied. This variable is increased after each copy to avoid copying duplicate records.

  1. Add_iteration: this Append Variable activity simply adds an empty element to an array variable called ‘iterations’. The length of the array is used to keep track of the iteration count and used to calculate the next offset.
  2. Set_offset: this Set Variable activity is used to increase the offset variable used in the SQL query. It is initialized with 0 at the start and incremented after each copy. The actual set expression is:

It multiplies the length of the iterations array with the batch size for specified for the current table being loaded. Let’s say the batch size is 100 and the copy activity has just copied records 0 to 100. The Add_iteration will set the array length to 1 after which the offset will be set to 1*100 = 100. The next copy activity will then fetch and copy records 100 to 200. This continues until the number of records copied is less than 100 which indicates we reached the end of the SQL result and are done with the copy.

The implementation of the offset management is very strange because the Data Factory does’t allow a variable to refer to itself within the set variable expression. Something simple as ‘offset = offset + batch_size’ is not possible. As a result, we came up with the method described above but any improvements on that are welcome 😊

The ‘SORT BY id ASC’ is the only part not parameterized. SQL Server requires the result to be sorted before it is possible to use OFFST … FETCH NEXT … All tables have a technical (auto incremented) id column so we can hard code it in the query. We will probably also put the sort column in the load_state table and parameterize it in the query to make it more flexible for future use.

A disadvantage of the SORT BY is that it makes the query more expensive and thus longer to execute. It worked perfectly for our initial load of tables and worked fine for tables with 400 million records. An alternative approach can be to split the data by attribute itself, for example the modification value, country code, etc. and load it in batches that way.

To be honest the batching only took place at the initial load at the data as the number of records fetched was bigger than the batch sizes specified. Logically, the incremental loads are a lot smaller and always fit the batch size. Hence these are copied daily in one batch.

Monitoring

The monitoring of this pipeline is rather simple as all sub-pipelines are also tracked within the Data Factory’s Monitor view. When the load of a single table fails it is shown in the Monitor. The error can be viewed, and the specific part of the pipeline can be re-executed with the exact parameters as they were provided during the first execution.

The screenshot below shows a typical run of this pipeline. It starts with 1_load_tables which gets data from the load_state and starts loading the individual tables in parallel, in this case two at a time.

Conclusion

And there it is, our solution to load a significant number of tables which can be used for the initial load as well as for the incremental loads. Using the load_state table allows us to add, remove, and change any tables that need to be loaded without adding or changing the actual Data Factory pipelines. Our first tests only partially loaded the data because of memory errors on the SQL Server. We could simply fix the problem by disabling all source tables except for the ones that failed in the load_state and rerun the pipeline.

Using the batch-wise copy method we can control the number and size of the Parquet files being written to blob. The batches also allow for some control on the SQL Server and Integration Runtime loads. Without the batch loading we ran into memory issues which got solved in this way.

Up until now, we are happy the way this pipeline behaves and have very little issues with it. The initial load was configured to run sequentially and took around 5 hours to complete. Our daily incremental loads run in parallel, two at a time, and take about 75 minutes. We can probably speed it up further by increasing parallelism but there is no need for that at the moment.  The data was compressed significantly by storing data in Parquet files as it uses columnar compression. The 100GB reported by the SQL Server got compressed to 17GB in blob storage. Not only does this minimize storage costs it is also faster to read and move.

We hope this blog post helps you accelerate your Data Factory development. Any improvements or alternative approaches are welcome. Let us know if you need help with your own data projects or are looking to accelerate your data strategy – check out the Anchormen Data Hub solution.

Like this article and want to stay updated of more news and events?
Then sign up for our newsletter!