Photo by Tim Mossholder on Pexels.com |
Machine Learning pipelines are essential to automate machine learning workflows. It chains a sequence of data processing steps together to complete ML solutions. There are many workflow engines such as mlflow (a open source project), KubeFlow (another open source project), and in Microsoft, we have Azure ML pipeline. In this blog, we are doing to illustrate how to execute a Azure Databrick Notebook in Azure ML pipeline.
1. Sample Use Case
One example of "embedding" a Azure Databrick Notebook in Azure ML pipeline is the need to process massive data.
Azure ML Pipeline |
The diagram above shows a simple ML pipeline where we have data preparation and training steps. Data is read from an Azure Blob Storage and size of the required data is not massive.
Then, we may have another data source which is massive. We want to use Azure Databricks to help us to do ETL (Extract-Transform-Load) task to generate data for this ML pipeline. That's Databricks takes in huge datasets for transformation and produce a smaller dataset for ML pipeline. We can use Jupyter Notebook to implement the code for ETL independently and then include it into the ML pipeline as shown below.
Databricks Step in ML Pipeline |
The beautiful thing about this inclusion of Jupyter Notebook in ML pipeline is that it provides a seamless integration of two different efforts. That's using Databricks to perform massive parallelize processing on big data, and with Azure ML Service to do data preparation and ML training.
In the rest of this blog, we solely focus on how to create a Databricks step in ML pipeline in Python.
1. Setup Environments
1.1 Install packages
pip install azureml.core pip install azureml pip install azureml-defaults pip install azureml.pipeline
2.2 Export Environment Parameters
PIPELINE_NAME=<name of pipeline, can be any string> ML_SERVICE_NAME=<azure ml service name> SUBSCRIPTION_ID=<your azure subscription id> TENANT_ID=<your tenant id for the subscription> SERVICE_PRINCIPAL_ID=<service principal id> SERVICE_PRINCIPAL_PWD=<service principal secret> RESOURCE_GROUP-<your azure resource group> AML_DATABRICKS_WORKSPACE=%lt;azure databricks workspace name> AML_DATABRICKS_ACCESS_TOKEN=%lt;azure databricks access token, we will provide information on this> AML_DATABRICKS_CLUSTER_NOTEBOOK=%lt;path to notebook e.g. /Users/goodfellow@microsoft.com/test > AML_DATABRICKS_COMPUTE_NAME=%lt;azure databricks compute name, can be any string> AML_DATABRICKS_CLUSTER_NAME=%lt;azure databricks cluster name, can be any string> AML_DATABRICKS_CLUSTER_VM_TYPE=%lt;azure databricks cluster virtual machine size e.g. Standard_DS3_v2 > AML_DATABRICKS_CLUSTER_SPARK_VERSION=%lt;azure databricks cluster spark version e.g. 7.0.x-scala2.12> databricks_min_workers=%lt;azure databricks cluster minimum worker node e.g. 2> databricks_max_workers=%lt;azure databricks cluster maximum worker node e.g. 8>
To get the Databricks access token
Open Azure Databricks Studio from Azure Portal. Click on the databricks button which is on the top right hand corner and select User Settings menu. You will be presented a page to generate access token.
Generate Access Token |
1.3 Source Code
Copy the code below to main.py and execute it according. python main.py
""" main program to have Azure Databricks' jupyter notebook in ML pipeline """ import os from azureml.core import Environment from azureml.core import Experiment from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication from azureml.core.compute import ComputeTarget, DatabricksCompute from azureml.core.compute_target import ComputeTargetException from azureml.core.databricks import PyPiLibrary from azureml.core.runconfig import RunConfiguration from azureml.pipeline.core import Pipeline from azureml.pipeline.steps import DatabricksStep ENV_PIPELINE_NAME = "PIPELINE_NAME" ENV_ML_SERVICE_NAME = "ML_SERVICE_NAME" ENV_SUBSCRIPTION_ID = "SUBSCRIPTION_ID" ENV_TENANT_ID = "TENANT_ID" ENV_SERVICE_PRINCIPAL_ID = "SERVICE_PRINCIPAL_ID" ENV_SERVICE_PRINCIPAL_PWD = "SERVICE_PRINCIPAL_PWD" databricks_compute_name = os.environ.get("AML_DATABRICKS_COMPUTE_NAME") databricks_cluster_name = os.environ.get("AML_DATABRICKS_CLUSTER_NAME") databricks_workspace_name = os.environ.get("AML_DATABRICKS_WORKSPACE") resource_group = os.environ.get("RESOURCE_GROUP") databricks_access_token = os.environ.get("AML_DATABRICKS_ACCESS_TOKEN") databricks_notebook = os.getenv("AML_DATABRICKS_CLUSTER_NOTEBOOK") databricks_spark_version = os.getenv("AML_DATABRICKS_CLUSTER_SPARK_VERSION") databricks_vm_type = os.getenv("AML_DATABRICKS_CLUSTER_VM_TYPE") databricks_min_workers = os.getenv("AML_DATABRICKS_CLUSTER_MIN_WORKERS") databricks_max_workers = os.getenv("AML_DATABRICKS_CLUSTER_MAX_WORKERS") def attach_db_compute(ws): try: databricks_compute = ComputeTarget(workspace=ws, name=databricks_compute_name) print("Compute target already exists") except ComputeTargetException: attach_config = DatabricksCompute.attach_configuration( resource_group=resource_group, workspace_name=databricks_workspace_name, access_token=databricks_access_token, ) databricks_compute = ComputeTarget.attach(ws, databricks_compute_name, attach_config) print(databricks_compute) databricks_compute.wait_for_completion(True) def create_db_step(): return DatabricksStep( name="databricks_step", inputs=[], outputs=[], notebook_path=databricks_notebook, compute_target=databricks_compute_name, spark_version=databricks_spark_version, node_type=databricks_vm_type, min_workers=int(databricks_min_workers), max_workers=int(databricks_max_workers), # this is the way to add python library to cluster pypi_libraries=[PyPiLibrary(package="fire==0.3.1")], allow_reuse=True, ) def main(): """ create the pipeline """ """ created service principal vs azure cli """ service_principal = ServicePrincipalAuthentication( tenant_id=os.getenv(ENV_TENANT_ID), service_principal_id=os.getenv(ENV_SERVICE_PRINCIPAL_ID), service_principal_password=os.getenv(ENV_SERVICE_PRINCIPAL_PWD), ) """ workspace name, subscription, resource group are externalized see variable group name in azure-pipeline.yml """ workspace = Workspace.get( name=os.getenv(ENV_ML_SERVICE_NAME), subscription_id=os.getenv(ENV_SUBSCRIPTION_ID), resource_group=resource_group, auth=service_principal, ) attach_db_compute(workspace) step = create_db_step() pipeline = Pipeline(workspace=workspace, steps=[step]) print("Pipeline is built.") # Create an experiment and run the pipeline experiment = Experiment(workspace=workspace, name=os.getenv(ENV_PIPELINE_NAME)) pipeline_run = experiment.submit(pipeline, regenerate_outputs=True) print("Pipeline submitted for execution.") pipeline_run.wait_for_completion() if __name__ == "__main__": main()
The code is short and self explanatory.
- Create a Service Principal Object, service_principal
- Get the Azure ML Workspace with service_principal credential
- Attached the Databricks compute with ML Workspace, attach_db_compute
- Create the Databricks step, create_db_step
- Construct a pipeline with this step
- Create an experiment and execute the pipeline.
Comments
Post a Comment