abalone pipeline walkthrough

AWS Sagemaker Series: Abalone Pipeline Walkthrough

Part II

In the previous post, we outlined the structure and topics that we would discuss throughout this entire blog series. The first major topic that we will dive into is how to build a basic machine learning pipeline and automate different steps of the ML workflow with Amazon SageMaker Pipelines, which is a new capability of Amazon SageMaker.

SageMaker provides an example pipeline that showcases the various pipeline steps available for a typical machine learning workflow including data preprocessing, training, evaluation, model creation, batch transformation, and model registration based on the Abalone age prediction problem using the UCI Machine Learning Abalone Dataset. The dataset contains the physical measurements of abalones, which are large, edible sea snails, where the goal is to train an XGBoost regression model that can accurately predict the age of an abalone snail based on these measurements.

abalone pipeline

The pipeline is structured as a directed acyclic graph (DAG) of interconnected Pipeline Step objects. This DAG is defined as a JSON pipeline definition, where the dependencies between the various steps are created when the output of a step is passed to a subsequent step as input, and we will walk through and explain the required setup for each of these steps.

The first step in the Abalone pipeline preprocesses the input data, which is already stored in S3, and then splits the cleaned data into training, validation and test sets. The resulting training data is then used as the input for the training step to fit an XGBoost regression model. After the model training is completed, the trained model makes predictions and evaluates the model’s performance on the testing data. If the model evaluation metrics meet the conditional checks, the pipeline will continue in parallel to register the model and perform a batch transform job to get predictions for an additional input dataset.

In the following sections, we will go through the specific steps (with code examples) for how to set up the necessary process and assemble this pipeline in SageMaker.

Abalone Process

To kick off the Abalone pipeline and our ML workflow, we need to do some data preprocessing and feature engineering on our raw input data, which is stored in S3, before splitting it into training validation and test datasets using scikit-learn preprocessing script. SageMaker has a dedicated SKLearnProcessor for handling such processing tasks.

from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(
framework_version='0.23-1',
role=,
instance_type=,
instance_count=,
)

Next, we need to construct a ProcessingStep with our SKLearnProcessor instance and configure the necessary inputs and outputs for the step, as well as specify the S3 URI or a local path to the preprocessing script, so the processing job knows what code to execute.

A SageMaker ProcessingInput takes an S3 path to a particular data source and tells the processing step where that data should be copied onto the container locally and used by the processor instance when it runs. Because this is the start of the pipeline, we can simply define our input as the S3 location of our raw input data with the ProcessingInput class.

With regards to the outputs, a SageMaker ProcessingOutput works in the opposite way: it writes out the data stored in a local path on the processing container to a designated path in S3. For the AbaloneProcess step, we will end up with three different outputs: training, validation and test datasets, which will need to be written to S3 when the job completes. And we can configure these outputs with the ProcessingOutput class.

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=, 
            destination="/opt/ml/processing/input",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train", 
            source="/opt/ml/processing/train",
        ),
        ProcessingOutput(
            output_name="validation", 
            source="/opt/ml/processing/validation",
        ),
        ProcessingOutput(
            output_name="test", 
            source="/opt/ml/processing/test",
        )
    ],
    code="abalone/preprocessing.py"
)

It is considered best practice to also assign names to the outputs of a processing job. These Step Properties can then be extracted and used in subsequent pipeline steps. For example, the training step will then use the train and validation output channels to train a model, and the evaluation step can extract the trained model artifacts and test channel in order to evaluate the model.

AbaloneTrain

The next step in the pipeline would be to train the actual model. We will use the SageMaker built-in XGBoost Algorithm to train a regression model on processed outputs from the AbaloneProcess step. In order to do that, we first need to retrieve the XGBoost algorithm container from the Amazon Elastic Container Registry (ECR) and configure a SageMaker Estimator with the necessary role and training instance type. We should also specify the model_path where we want to save the trained model artifacts. Once we have our estimator, we can set some static hyperparameters. It should be pointed out that we are trying to solve a regression problem, so we should specify an appropriate objective hyperparameter.

from sagemaker.estimator import Estimator
model_path = f"s3://{default_bucket}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
)
xgb = Estimator(
    image_uri=image_uri,
    instance_type=,
    instance_count=,
    output_path=model_path,
    role=,
)
xgb.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    ...,
)

Now, we can continue to define a TrainingStep, where the estimator we defined above and the proper training data channels are required. We will pass in the training and validation datasets from the preprocessing step as SageMaker TrainingInput objects. The S3 data path information can be extracted from the AbaloneProcess step using step properties.

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

NOTE: the inputs of the training step are represented as a dictionary, not a list (like for the processing step), where the keys indicate which training input channels should be used during the training job.

This SageMaker TrainingStep will load the data from the two input channels, configure and launch a training job with our Estimator and hyperparameters, train an XGBoost regression model and save it to the SM_MODEL_DIR environment variable so that it can be deployed later on. At the end of the training job, SageMaker uploads the serialized trained model artifacts model.tar.gz to the model path we specified in our estimator configuration.

AbaloneEval

When the training job is completed, we can then evaluate the model’s performance on the test set. To do so, we will create another ProcessingStep. The only difference is that we will use the SageMaker ScriptProcessor, instead of the SKLearnProcessor. In the AbaloneProcess step, we used SkLearnProcessor, which is a SageMaker built-in container, ready to use out-of-the-box without the need for any customizations. But it is not always the case that our code can be easily executed in SageMaker pre-defined containers.

The ScriptProcessor handles Amazon SageMaker Processing tasks for jobs using a machine learning framework, which allows for providing a script to be run as part of the Processing Job. This way, the xgboost package can be made available and imported during the job, which is not available when using the SKLearnProcessor. We can easily use the XGBoost framework to deserialize and load our trained estimator, make predictions on the test set and calculate several metrics in order to evaluate the model’s performance.

We will follow a similar setup as the AbaloneProcess step, by creating an instance of a ScriptProcessor that will be passed to another ProcessingStep object. When configuring the SageMaker ScriptProcessor, we pass in the same XGBoost framework image URI that we used to define the Estimator.

from sagemaker.processing import ScriptProcessor
script_processor = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=,
    instance_count=,
    role=,
)

NOTE: The command argument indicates which command to run, along with any additional command-line flags, once the container is spun up. In the Abalone case, we want to execute a Python script.

The metrics calculated in the custom Python evaluation script should be formatted in the same way as described in the SageMaker Model Metrics. The metrics format also depends on which type of machine learning problem you would like to solve. We have trained an XGBoost regression model, so we should follow the same nested structure and key-value headers for the regression metrics.

SageMaker provides a dedicated way to store information that is calculated during a processing step. PropertyFiles can be used to store information from the output of a processing step, including evaluation metrics. These results can then be used later on to determine which execution path to take in the pipeline. We want to use the evaluation metrics to determine whether or not the pipeline execution should continue and register the trained model in the SageMaker Model Registry.

To configure our PropertyFile, it is important to specify the path parameter, indicating the name of the JSON file that the evaluation metrics should be saved to. We also need to give the property file output a name corresponding to the name of the ProcessingOutput that we will define in the evaluation processing step. This enables the property file to actually capture the evaluation metrics that are calculated when the step is executed.

from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

Now we can define our evaluation processing step. The setup is more or less the same as for the AbaloneProcess step, except we pass in our ScriptProcessor instance instead. We can extract the trained model artifacts from the S3ModelArtifacts property of the AbaloneTrain step along with the test channel from the AbaloneProcess step as our inputs.

As for the outputs, we only need to create a ProcessingOutput for the evaluation metric JSON file. Again, it is imperative that the name assigned to this output matches the output_name we defined for our property file above. Otherwise, we will not be able to access the evaluation metrics in subsequent pipeline steps.

We also need to specify some other required step parameters, including the local path to the evaluation script to be executed during the processing job and our property file instance.

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
step_eval = ProcessingStep(
    name="AbaloneEval",
    processor=script_processor,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", 
            source="/opt/ml/processing/evaluation",
        ),
    ],
    code="abalone/evaluation.py",
    property_files=[evaluation_report],
)

The evaluation script will take the trained model and the test dataset as input, then produce write out the desired regression evaluation metrics, like the mean squared error, as output.

AbaloneCreateModel

After the model evaluation, we need to start preparing the steps we want our pipeline to execute if certain model performance conditions are met. We will start with defining the necessary steps for executing a batch transform job in order to get predictions for our entire test set.

In order to launch a batch transform job, we need to extract the trained model artifact from the output of the training step and convert it to a SageMaker Model instance that can be deployed to an endpoint. Even though we are not deploying our model into production (yet), it is important to note here that batch transform and persistent, real-time hosting in SageMaker use the same mechanics under the hood, the only difference being that real-time endpoints continue running, whereas batch transform resources are shut down once the job completes.

Along with the training step output, we need to define some additional inputs for our batch job model. We can instantiate a CreateModelInput object to specify the type of EC2 instance we would like to use for our batch endpoint deployment as well as an Amazon SageMaker Elastic Inference (EI) accelerator instance type. EI accelerators are network-attached devices that enable GPU-like speed for inference calls made to an endpoint but at a significantly lower cost.

from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=,
    role=,
)
inputs = CreateModelInput(
    instance_type=,
    accelerator_type="ml.eia1.medium",
)

NOTE: El accelerators can only be used with algorithm frameworks that support GPU usage. If you wish to set up a pipeline that uses another algorithm besides XGBoost, be sure to check whether or not GPUs are also supported. If not, you can still initialize the model inputs, but without specifying an EI accelerator.

We can now define our CreateModelStep with our model and the corresponding model inputs.

from sagemaker.workflow.steps import CreateModelStep
step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=model,
    inputs=inputs,
)

AbaloneTransform

With our CreateModelStep, we are now ready to set up our batch transform step. We will start by creating a SageMaker Transformer instance with an appropriate EC2 instance type and instance count, and output location for where to write the batch results in S3. We also need to extract the ModelName property from the step_create_model so the batch transform job knows which code to execute in order to make the predictions at inference time.

from sagemaker.transformer import Transformer
batch_path = f"s3://{default_bucket}/AbaloneTransform"
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=,
    instance_count=,
    output_path=batch_path,
)

Then we can simply create a TransformStep using the transformer instance we defined above and pass in the S3 path for the data we want to get batch predictions for.

from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(
        data=,
    ),
)

AbaloneRegisterModel

We also want to register our trained model if certain conditions are met. In order to register a model in the SageMaker Model Registry, we should first define some Model Metrics. These metrics will then be visible in the SageMaker Model Registry and can be used to compare different versions of a model within a model package group. We can again use the PropertyFile from the AbaloneEval step and define a ModelMetrics object, which will be passed to our RegisterModel step.

from sagemaker.model_metrics import MetricsSource, ModelMetrics 
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

Next, we can construct a RegisterModel instance with our ModelMetrics and our Estimator instance we used for the AbaloneTrain step. We should also specify a ModelPackageGroup name. A model package group in SageMaker is a collection of model packages. If this step is executed, a new model package version for a particular model package group will be registered in the SageMaker Model Registry.

We also need to specify which data formats are acceptable for the model inputs (content_types) and outputs (response_types). This is also dependent on which algorithm you choose to train your model with. In SageMaker, the built-in XGBoost algorithm supports CSV and LibSVM data formats for training and inference, where LibSVM is the default for both.

Additionally, we can specify lists of appropriate EC2 instances for inference and transform jobs. inference_instances indicates which instance types can be used for generating inference predictions in real-time, and transform_instances lists which instance types are suitable for spinning up batch jobs or for deploying to an endpoint.

Then we should pass in an approval_status, assigning a particular model status to the newly registered model version gets in the Model Registry. The possible statuses include ‘Approved’, ‘Rejected’ and ‘PendingManualApproval’.

from sagemaker.workflow.step_collections import RegisterModel
step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=[...],
    response_types=[...],
    inference_instances=[...],
    transform_instances=[...],
    model_package_group_name=,
    approval_status=,
    model_metrics=model_metrics,
)

The AbaloneCreateModel, AbaloneTransform and AbaloneRegisterModel steps will be passed into the condition step and will only be executed in the pipeline if the condition step evaluates to true.

AbaloneMSECond

For our last step, we need to define our ConditionStep to evaluate the condition of step properties and assess which subsequent pipeline steps should be executed. In order to set this up, we need to define a list of conditions to check. Because we are trying to solve a regression problem, we will focus on the mean squared error (MSE) metric calculated in the AbaloneEval step.

The MSE metric is something we want to minimize, so we will define a ConditionLessThanOrEqualTo condition object, indicating that we want our MSE value to be less than or equal to a pre-defined threshold of our choosing. We can check this condition by querying our JSON PropertyFile for the desired property using the json_path parameter.

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

NOTE: The json_path refers to the nested JSON path according to the SageMaker Model Metrics regression format, and we want to check if the MSE value calculated during the evaluation is less than 6 years.

Our condition step also needs a list of steps to execute if the MSE condition is met, as well as a list of steps to execute if the condition is not met. For the Abalone case, we want to execute the step_create_model and step_transform to launch a batch transform job. We also want to execute the step_register to register a new model version for a specific model package group.

Now to actually construct this step, we need to initialize a ConditionStep object and pass it the less than condition, then set the model package registration and batch transformation steps as the next steps if the condition passes.

from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet
)
step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[],
)

NOTE: If the MSE condition is not met, we do not want to do anything; the pipeline should not execute any other steps.

Pipeline

Finally, we can assemble all of the steps that we have defined above into a Pipeline object. SageMaker Pipeline instances require a unique pipeline name and the list of steps to execute as part of the workflow. We can also specify some global pipeline parameter objects that will be used throughout the pipeline execution.

from sagemaker.workflow.pipeline import Pipeline
pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        , 
        ,
        ,
        ,
        ,
        ,
        ,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

After we’ve created a pipeline definition with the SageMaker Python SDK, we can submit our pipeline to SageMaker in order to start an actual execution. If the given pipeline_name does already exists, the pipeline DAG will update with the newest changes. However, if a pipeline does not exist for the specified pipeline_name, a new DAG will be created in the SageMaker Studio UI.

pipeline.upsert(iam_role=)
execution = pipeline.start()

Conclusion

This Abalone example demonstrates a lot of the various possibilities for setting up a basic SageMaker Pipeline. However, in the next blog post, we will explore some additional ways to adjust and improve on this example with other step configurations that can also be implemented in order to better suit your specific business problem needs.

Like this article and want to stay updated of more news and events?
Then sign up for our newsletter!

Don't miss out!

Subscribe to our newsletter and stay up to date with our latest articles and events!

Subscribe now

Newsletter Subscription