Azure Databricks Notebook in Azure ML pipeline

 

Photo by Tim Mossholder on Pexels.com
Photo by Tim Mossholder on Pexels.com

Machine Learning pipelines are essential to automate machine learning workflows. It chains a sequence of data processing steps together to complete ML solutions. There are many workflow engines such as mlflow (a open source project), KubeFlow (another open source project), and in Microsoft, we have Azure ML pipeline. In this blog, we are doing to illustrate how to execute a Azure Databrick Notebook in Azure ML pipeline.

1. Sample Use Case

One example of "embedding" a Azure Databrick Notebook in Azure ML pipeline is the need to process massive data.

Azure ML Pipeline
Azure ML Pipeline

The diagram above shows a simple ML pipeline where we have data preparation and training steps. Data is read from an Azure Blob Storage and size of the required data is not massive.

Then, we may have another data source which is massive. We want to use Azure Databricks to help us to do ETL (Extract-Transform-Load) task to generate data for this ML pipeline. That's Databricks takes in huge datasets for transformation and produce a smaller dataset for ML pipeline. We can use Jupyter Notebook to implement the code for ETL independently and then include it into the ML pipeline as shown below.

Databricks Step in ML Pipeline
Databricks Step in ML Pipeline


The beautiful thing about this inclusion of Jupyter Notebook in ML pipeline is that it provides a seamless integration of two different efforts. That's using Databricks to perform massive parallelize processing on big data, and with Azure ML Service to do data preparation and ML training.

In the rest of this blog, we solely focus on how to create a Databricks step in ML pipeline in Python.

1. Setup Environments

1.1 Install packages

pip install azureml.core
pip install azureml
pip install azureml-defaults
pip install azureml.pipeline

2.2 Export Environment Parameters

PIPELINE_NAME=<name of pipeline, can be any string>
ML_SERVICE_NAME=<azure ml service name>
SUBSCRIPTION_ID=<your azure subscription id>
TENANT_ID=<your tenant id for the subscription>
SERVICE_PRINCIPAL_ID=<service principal id>
SERVICE_PRINCIPAL_PWD=<service principal secret>
RESOURCE_GROUP-<your azure resource group>

AML_DATABRICKS_WORKSPACE=%lt;azure databricks workspace name>
AML_DATABRICKS_ACCESS_TOKEN=%lt;azure databricks access token, we will provide information on this>
AML_DATABRICKS_CLUSTER_NOTEBOOK=%lt;path to notebook e.g. /Users/goodfellow@microsoft.com/test >

AML_DATABRICKS_COMPUTE_NAME=%lt;azure databricks compute name, can be any string>
AML_DATABRICKS_CLUSTER_NAME=%lt;azure databricks cluster name, can be any string>
AML_DATABRICKS_CLUSTER_VM_TYPE=%lt;azure databricks cluster virtual machine size e.g. Standard_DS3_v2 >
AML_DATABRICKS_CLUSTER_SPARK_VERSION=%lt;azure databricks cluster spark version e.g. 7.0.x-scala2.12>

databricks_min_workers=%lt;azure databricks cluster minimum worker node e.g. 2>
databricks_max_workers=%lt;azure databricks cluster maximum worker node e.g. 8>

To get the Databricks access token

Open Azure Databricks Studio from Azure Portal. Click on the databricks button which is on the top right hand corner and select User Settings menu. You will be presented a page to generate access token.

Generate Access Token
Generate Access Token

1.3 Source Code

Copy the code below to main.py and execute it according. python main.py

""" main program to have Azure Databricks' jupyter notebook in ML pipeline """

import os

from azureml.core import Environment
from azureml.core import Experiment
from azureml.core import Workspace
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core.compute import ComputeTarget, DatabricksCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.databricks import PyPiLibrary
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import DatabricksStep

ENV_PIPELINE_NAME = "PIPELINE_NAME"
ENV_ML_SERVICE_NAME = "ML_SERVICE_NAME"
ENV_SUBSCRIPTION_ID = "SUBSCRIPTION_ID"
ENV_TENANT_ID = "TENANT_ID"
ENV_SERVICE_PRINCIPAL_ID = "SERVICE_PRINCIPAL_ID"
ENV_SERVICE_PRINCIPAL_PWD = "SERVICE_PRINCIPAL_PWD"

databricks_compute_name = os.environ.get("AML_DATABRICKS_COMPUTE_NAME")
databricks_cluster_name = os.environ.get("AML_DATABRICKS_CLUSTER_NAME")
databricks_workspace_name = os.environ.get("AML_DATABRICKS_WORKSPACE")
resource_group = os.environ.get("RESOURCE_GROUP")
databricks_access_token = os.environ.get("AML_DATABRICKS_ACCESS_TOKEN")
databricks_notebook = os.getenv("AML_DATABRICKS_CLUSTER_NOTEBOOK")
databricks_spark_version = os.getenv("AML_DATABRICKS_CLUSTER_SPARK_VERSION")
databricks_vm_type = os.getenv("AML_DATABRICKS_CLUSTER_VM_TYPE")
databricks_min_workers = os.getenv("AML_DATABRICKS_CLUSTER_MIN_WORKERS")
databricks_max_workers = os.getenv("AML_DATABRICKS_CLUSTER_MAX_WORKERS")


def attach_db_compute(ws):
    try:
        databricks_compute = ComputeTarget(workspace=ws, name=databricks_compute_name)
        print("Compute target already exists")
    except ComputeTargetException:
        attach_config = DatabricksCompute.attach_configuration(
            resource_group=resource_group,
            workspace_name=databricks_workspace_name,
            access_token=databricks_access_token,
        )
        databricks_compute = ComputeTarget.attach(ws, databricks_compute_name, attach_config)
        print(databricks_compute)
        databricks_compute.wait_for_completion(True)


def create_db_step():
    return DatabricksStep(
        name="databricks_step",
        inputs=[],
        outputs=[],
        notebook_path=databricks_notebook,
        compute_target=databricks_compute_name,
        spark_version=databricks_spark_version,
        node_type=databricks_vm_type,
        min_workers=int(databricks_min_workers),
        max_workers=int(databricks_max_workers),
        
        # this is the way to add python library to cluster
        pypi_libraries=[PyPiLibrary(package="fire==0.3.1")],
        allow_reuse=True,
    )


def main():
    """ create the pipeline """

    """ created service principal vs azure cli """
    service_principal = ServicePrincipalAuthentication(
        tenant_id=os.getenv(ENV_TENANT_ID),
        service_principal_id=os.getenv(ENV_SERVICE_PRINCIPAL_ID),
        service_principal_password=os.getenv(ENV_SERVICE_PRINCIPAL_PWD),
    )

    """ workspace name, subscription, resource group are externalized
        see variable group name in azure-pipeline.yml
    """
    workspace = Workspace.get(
        name=os.getenv(ENV_ML_SERVICE_NAME),
        subscription_id=os.getenv(ENV_SUBSCRIPTION_ID),
        resource_group=resource_group,
        auth=service_principal,
    )
    attach_db_compute(workspace)
    step = create_db_step()

    pipeline = Pipeline(workspace=workspace, steps=[step])
    print("Pipeline is built.")

    # Create an experiment and run the pipeline
    experiment = Experiment(workspace=workspace, name=os.getenv(ENV_PIPELINE_NAME))
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
    print("Pipeline submitted for execution.")

    pipeline_run.wait_for_completion()


if __name__ == "__main__":
    main()

The code is short and self explanatory. 

  1. Create a Service Principal Objectservice_principal
  2. Get the Azure ML Workspace with service_principal credential
  3. Attached the Databricks compute with ML Workspace, attach_db_compute
  4. Create the Databricks step, create_db_step
  5. Construct a pipeline with this step
  6. Create an experiment and execute the pipeline.

Summary

We have provided a sample use case to have Databricks' Jupyter Notebook in Azure ML Service pipeline. We have also provided the Python code to create a Azure ML Service pipeline with DatabricksStep. There are other things that you may need to figure out such as pass environment parameters to Databricks' Jupyter Notebook. Please take a look at DatabricksStep official documentation



Comments