diff --git a/pipeline/20news.pkl b/pipeline/20news.pkl new file mode 100644 index 000000000..396e01775 Binary files /dev/null and b/pipeline/20news.pkl differ diff --git a/pipeline/README.md b/pipeline/README.md new file mode 100644 index 000000000..6690437d8 --- /dev/null +++ b/pipeline/README.md @@ -0,0 +1,48 @@ +# Azure Machine Learning Pipeline + +## Overview + +The [Azure Machine Learning Pipelines](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines) enables data scientists to create and manage multiple simple and complex workflows concurrently. A typical pipeline would have multiple tasks to prepare data, train, deploy and evaluate models. Individual steps in the pipeline can make use of diverse compute options (for example: CPU for data preparation and GPU for training) and languages. + +The Python-based Azure Machine Learning Pipeline SDK provides interfaces to work with Azure Machine Learning Pipelines. To get started quickly, the SDK includes imperative constructs for sequencing and parallelization of steps. With the use of declarative data dependencies, optimized execution of the tasks can be achieved. The SDK can be easily used from Jupyter Notebook or any other preferred IDE. The SDK includes a framework of pre-built modules for common tasks such as data transfer and compute provisioning. + +Data management and reuse across pipelines and pipeline runs is simplified using named and strictly versioned data sources and named inputs and outputs for processing tasks. Pipelines enable collaboration across teams of data scientists by recording all intermediate tasks and data. + +### Why build pipelines? + +With pipelines, you can optimize your workflow with simplicity, speed, portability, and reuse. When building pipelines with Azure Machine Learning, you can focus on what you know best — machine learning — rather than infrastructure. + +Using distinct steps makes it possible to rerun only the steps you need as you tweak and test your workflow. Once the pipeline is designed, there is often more fine-tuning around the training loop of the pipeline. When you rerun a pipeline, the execution jumps to the steps that need to be rerun, such as an updated training script, and skips what hasn't changed. The same paradigm applies to unchanged scripts and metadata. + +With Azure Machine Learning, you can use distinct toolkits and frameworks for each step in your pipeline. Azure coordinates between the various compute targets you use so that your intermediate data can be shared with the downstream compute targets easily. + +![MLLifecycle](aml-pipelines-concept.png) + + +### Azure Machine Learning Pipelines Features +Azure Machine Learning Pipelines optimize for simplicity, speed, and efficiency. The following key concepts make it possible for a data scientist to focus on ML rather than infrastructure. + +**Unattended execution**: Schedule a few scripts to run in parallel or in sequence in a reliable and unattended manner. Since data prep and modeling can last days or weeks, you can now focus on other tasks while your pipeline is running. + +**Mixed and diverse compute**: Use multiple pipelines that are reliably coordinated across heterogeneous and scalable computes and storages. Individual pipeline steps can be run on different compute targets, such as HDInsight, GPU Data Science VMs, and Databricks, to make efficient use of available compute options. + +**Reusability**: Pipelines can be templatized for specific scenarios such as retraining and batch scoring. They can be triggered from external systems via simple REST calls. + +**Tracking and versioning**: Instead of manually tracking data and result paths as you iterate, use the pipelines SDK to explicitly name and version your data sources, inputs, and outputs as well as manage scripts and data separately for increased productivity. + +### Notebooks + +In this directory, there are two types of notebooks: + +* The first type of notebooks will introduce you to core Azure Machine Learning Pipelines features. The notebooks below belong in this category, and are designed to go in sequence: + +1. aml-pipelines-getting-started.ipynb +2. aml-pipelines-with-data-dependency-steps.ipynb +3. aml-pipelines-publish-and-run-using-rest-endpoint.ipynb +4. aml-pipelines-data-transfer.ipynb +5. aml-pipelines-use-databricks-as-compute-target.ipynb +6. aml-pipelines-use-adla-as-compute-target.ipynb + +* The second type of notebooks illustrate more sophisticated scenarios, and are independent of each other. These notebooks include: + - pipeline-batch-scoring.ipynb + - pipeline-style-transfer.ipynb diff --git a/pipeline/aml-pipelines-concept.png b/pipeline/aml-pipelines-concept.png new file mode 100644 index 000000000..b01526dab Binary files /dev/null and b/pipeline/aml-pipelines-concept.png differ diff --git a/pipeline/aml-pipelines-data-transfer.ipynb b/pipeline/aml-pipelines-data-transfer.ipynb new file mode 100644 index 000000000..ec485d5d8 --- /dev/null +++ b/pipeline/aml-pipelines-data-transfer.ipynb @@ -0,0 +1,336 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved. \n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Azure Machine Learning Pipeline with DataTranferStep\n", + "This notebook is used to demonstrate the use of DataTranferStep in Azure Machine Learning Pipeline.\n", + "\n", + "In certain cases, you will need to transfer data from one data location to another. For example, your data may be in Files storage and you may want to move it to Blob storage. Or, if your data is in an ADLS account and you want to make it available in the Blob storage. The built-in **DataTransferStep** class helps you transfer data in these situations.\n", + "\n", + "The below example shows how to move data in an ADLS account to Blob storage." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Azure Machine Learning and Pipeline SDK-specific imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import azureml.core\n", + "from azureml.core.compute import ComputeTarget, DatabricksCompute, DataFactoryCompute\n", + "from azureml.exceptions import ComputeTargetException\n", + "from azureml.core import Workspace, Run, Experiment\n", + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "from azureml.pipeline.steps import AdlaStep\n", + "from azureml.core.datastore import Datastore\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.data.sql_data_reference import SqlDataReference\n", + "from azureml.core import attach_legacy_compute_target\n", + "from azureml.data.stored_procedure_parameter import StoredProcedureParameter, StoredProcedureParameterType\n", + "from azureml.pipeline.steps import DataTransferStep\n", + "\n", + "# Check core SDK version number\n", + "print(\"SDK version:\", azureml.core.VERSION)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize Workspace\n", + "\n", + "Initialize a workspace object from persisted configuration. Make sure the config file is present at .\\config.json\n", + "\n", + "If you don't have a config.json file, please go through the configuration Notebook located here:\n", + "https://github.com/Azure/MachineLearningNotebooks. \n", + "\n", + "This sets you up with a working config file that has information on your workspace, subscription id, etc. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "create workspace" + ] + }, + "outputs": [], + "source": [ + "ws = Workspace.from_config()\n", + "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Register Datastores\n", + "\n", + "In the code cell below, you will need to fill in the appropriate values for the workspace name, datastore name, subscription id, resource group, store name, tenant id, client id, and client secret that are associated with your ADLS datastore. \n", + "\n", + "For background on registering your data store, consult this article:\n", + "\n", + "https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# un-comment the following and replace the strings with the \n", + "# correct values for your ADLS datastore\n", + "\n", + "# workspace = \"\"\n", + "# datastore_name = \"\" # ADLS datastore name\n", + "# subscription_id = \"\" # subscription id of ADLS account\n", + "# resource_group = \"\" # resource group of ADLS account\n", + "# store_name = \"\" # ADLS account name\n", + "# tenant_id = \"\" # tenant id of service principal\n", + "# client_id = \"\" # client id of service principal\n", + "# client_secret = \"\" # the secret of service principal\n", + "\n", + "\n", + "try:\n", + " adls_datastore = Datastore.get(ws, datastore_name)\n", + " print(\"found datastore with name: %s\" % datastore_name)\n", + "except:\n", + " adls_datastore = Datastore.register_azure_data_lake(\n", + " workspace=ws,\n", + " datastore_name=datastore_name,\n", + " subscription_id=subscription_id, # subscription id of ADLS account\n", + " resource_group=resource_group, # resource group of ADLS account\n", + " store_name=store_name, # ADLS account name\n", + " tenant_id=tenant_id, # tenant id of service principal\n", + " client_id=client_id, # client id of service principal\n", + " client_secret=client_secret) # the secret of service principal\n", + " print(\"registered datastore with name: %s\" % datastore_name)\n", + "\n", + "# un-comment the following and replace the strings with the\n", + "# correct values for your blob datastore\n", + "\n", + "# blob_datastore_name = \"\"\n", + "# account_name = \"\"\n", + "# container_name = \"\"\n", + "# account_key = \"\"\n", + "\n", + "try:\n", + " blob_datastore = Datastore.get(ws, blob_datastore_name)\n", + " print(\"found blob datastore with name: %s\" % blob_datastore_name)\n", + "except:\n", + " blob_datastore = Datastore.register_azure_blob_container(\n", + " workspace=ws,\n", + " datastore_name=blob_datastore_name,\n", + " account_name=account_name, # Storage account name\n", + " container_name=container_name, # Name of Azure blob container\n", + " account_key=account_key) # Storage account key\"\n", + " print(\"registered blob datastore with name: %s\" % blob_datastore_name)\n", + "\n", + "# CLI:\n", + "# az ml datastore register-blob -n -a -c -k [-t ]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create DataReferences" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "adls_datastore = Datastore(workspace=ws, name=\"MyAdlsDatastore\")\n", + "\n", + "# adls\n", + "adls_data_ref = DataReference(\n", + " datastore=adls_datastore,\n", + " data_reference_name=\"adls_test_data\",\n", + " path_on_datastore=\"testdata\")\n", + "\n", + "blob_datastore = Datastore(workspace=ws, name=\"MyBlobDatastore\")\n", + "\n", + "# blob data\n", + "blob_data_ref = DataReference(\n", + " datastore=blob_datastore,\n", + " data_reference_name=\"blob_test_data\",\n", + " path_on_datastore=\"testdata\")\n", + "\n", + "print(\"obtained adls, blob data references\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup Data Factory Account" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data_factory_name = 'adftest'\n", + "\n", + "def get_or_create_data_factory(workspace, factory_name):\n", + " try:\n", + " return DataFactoryCompute(workspace, factory_name)\n", + " except ComputeTargetException as e:\n", + " if 'ComputeTargetNotFound' in e.message:\n", + " print('Data factory not found, creating...')\n", + " provisioning_config = DataFactoryCompute.provisioning_configuration()\n", + " data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)\n", + " data_factory.wait_for_provisioning()\n", + " return data_factory\n", + " else:\n", + " raise e\n", + " \n", + "data_factory_compute = get_or_create_data_factory(ws, data_factory_name)\n", + "\n", + "print(\"setup data factory account complete\")\n", + "\n", + "# CLI:\n", + "# Create: az ml computetarget setup datafactory -n \n", + "# BYOC: az ml computetarget attach datafactory -n -i " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create a DataTransferStep" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**DataTransferStep** is used to transfer data between Azure Blob, Azure Data Lake Store, and Azure SQL database.\n", + "\n", + "- **name:** Name of module\n", + "- **source_data_reference:** Input connection that serves as source of data transfer operation.\n", + "- **destination_data_reference:** Input connection that serves as destination of data transfer operation.\n", + "- **compute_target:** Azure Data Factory to use for transferring data.\n", + "- **allow_reuse:** Whether the step should reuse results of previous DataTransferStep when run with same inputs. Set as False to force data to be transferred again.\n", + "\n", + "Optional arguments to explicitly specify whether a path corresponds to a file or a directory. These are useful when storage contains both file and directory with the same name or when creating a new destination path.\n", + "\n", + "- **source_reference_type:** An optional string specifying the type of source_data_reference. Possible values include: 'file', 'directory'. When not specified, we use the type of existing path or directory if it's a new path.\n", + "- **destination_reference_type:** An optional string specifying the type of destination_data_reference. Possible values include: 'file', 'directory'. When not specified, we use the type of existing path or directory if it's a new path." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "transfer_adls_to_blob = DataTransferStep(\n", + " name=\"transfer_adls_to_blob\",\n", + " source_data_reference=adls_data_ref,\n", + " destination_data_reference=blob_data_ref,\n", + " compute_target=data_factory_compute)\n", + "\n", + "print(\"data transfer step created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build and Submit the Experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline = Pipeline(\n", + " description=\"data_transfer_101\",\n", + " workspace=ws,\n", + " steps=[transfer_adls_to_blob])\n", + "\n", + "pipeline_run = Experiment(ws, \"Data_Transfer_example\").submit(pipeline)\n", + "pipeline_run.wait_for_completion()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### View Run Details" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.widgets import RunDetails\n", + "RunDetails(pipeline_run).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Next: Databricks as a Compute Target\n", + "To use Databricks as a compute target from Azure Machine Learning Pipeline, a DatabricksStep is used. This [notebook](./aml-pipelines-use-databricks-as-compute-target.ipynb) demonstrates the use of a DatabricksStep in an Azure Machine Learning Pipeline." + ] + } + ], + "metadata": { + "authors": [ + { + "name": "diray" + } + ], + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipeline/aml-pipelines-getting-started.ipynb b/pipeline/aml-pipelines-getting-started.ipynb new file mode 100644 index 000000000..516b7aab8 --- /dev/null +++ b/pipeline/aml-pipelines-getting-started.ipynb @@ -0,0 +1,631 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved. \n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Azure Machine Learning Pipelines: Getting Started\n", + "\n", + "## Overview\n", + "\n", + "Read [Azure Machine Learning Pipelines](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines) overview, or the [readme article](./README.md) on Azure Machine Learning Pipelines to get more information.\n", + " \n", + "\n", + "This Notebook shows basic construction of a **pipeline** that runs jobs unattended in different compute clusters. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites and Azure Machine Learning Basics\n", + "Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Installing Packages\n", + "These packages are used at later stages." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install pandas\n", + "!pip install requests" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enabling Widgets\n", + "\n", + "Install the following jupyter extensions to support Azure Machine Learning widgets." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install azureml.widgets\n", + "!jupyter nbextension install --py --user azureml.widgets\n", + "!jupyter nbextension enable --py --user azureml.widgets" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Azure Machine Learning Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import azureml.core\n", + "from azureml.core import Workspace, Run, Experiment, Datastore\n", + "from azureml.core.compute import AmlCompute\n", + "from azureml.core.compute import ComputeTarget\n", + "from azureml.core.compute import DataFactoryCompute\n", + "from azureml.widgets import RunDetails\n", + "\n", + "# Check core SDK version number\n", + "print(\"SDK version:\", azureml.core.VERSION)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Pipeline SDK-specific imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.data.data_reference import DataReference\n", + "from azureml.pipeline.core import Pipeline, PipelineData, StepSequence\n", + "from azureml.pipeline.steps import PythonScriptStep\n", + "from azureml.pipeline.steps import DataTransferStep\n", + "from azureml.pipeline.core import PublishedPipeline\n", + "from azureml.pipeline.core.graph import PipelineParameter\n", + "\n", + "print(\"Pipeline SDK-specific imports completed\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Initialize Workspace\n", + "\n", + "Initialize a [workspace](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.workspace(class%29) object from persisted configuration." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "create workspace" + ] + }, + "outputs": [], + "source": [ + "ws = Workspace.from_config()\n", + "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')\n", + "\n", + "# Default datastore (Azure file storage)\n", + "def_file_store = ws.get_default_datastore() \n", + "# The above call is equivalent to Datastore(ws, \"workspacefilestore\") or simply Datastore(ws)\n", + "print(\"Default datastore's name: {}\".format(def_file_store.name))\n", + "\n", + "# Blob storage associated with the workspace\n", + "# The following call GETS the Azure Blob Store associated with your workspace.\n", + "# Note that workspaceblobstore is **the name of this store and CANNOT BE CHANGED and must be used as is** \n", + "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n", + "print(\"Blobstore's name: {}\".format(def_blob_store.name))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# project folder\n", + "project_folder = '.'\n", + " \n", + "print('Sample projects will be created in {}.'.format(project_folder))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Required data and script files for the the tutorial\n", + "Sample files required to finish this tutorial are already copied to the project folder specified above. Even though the .py provided in the samples don't have much \"ML work,\" as a data scientist, you will work on this extensively as part of your work. To complete this tutorial, the contents of these files are not very important. The one-line files are for demostration purpose only." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Datastore concepts\n", + "A [Datastore](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore(class) is a place where data can be stored that is then made accessible to a compute either by means of mounting or copying the data to the compute target. \n", + "\n", + "A Datastore can either be backed by an Azure File Storage (default) or by an Azure Blob Storage.\n", + "\n", + "In this next step, we will upload the training and test set into the workspace's default storage (File storage), and another piece of data to Azure Blob Storage. When to use [Azure Blobs](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction), [Azure Files](https://docs.microsoft.com/en-us/azure/storage/files/storage-files-introduction), or [Azure Disks](https://docs.microsoft.com/en-us/azure/virtual-machines/linux/managed-disks-overview) is [detailed here](https://docs.microsoft.com/en-us/azure/storage/common/storage-decide-blobs-files-disks).\n", + "\n", + "**Please take good note of the concept of the datastore.**" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Upload data to default datastore\n", + "Default datastore on workspace is the Azure File storage. The workspace has a Blob storage associated with it as well. Let's upload a file to each of these storages." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# get_default_datastore() gets the default Azure File Store associated with your workspace.\n", + "# Here we are reusing the def_file_store object we obtained earlier\n", + "\n", + "# target_path is the directory at the destination\n", + "def_file_store.upload_files(['./20news.pkl'], \n", + " target_path = '20newsgroups', \n", + " overwrite = True, \n", + " show_progress = True)\n", + "\n", + "# Here we are reusing the def_blob_store we created earlier\n", + "def_blob_store.upload_files([\"./20news.pkl\"], target_path=\"20newsgroups\", overwrite=True)\n", + "\n", + "print(\"Upload calls completed\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### (Optional) See your files using Azure Portal\n", + "Once you successfully uploaded the files, you can browse to them (or upload more files) using [Azure Portal](https://portal.azure.com). At the portal, make sure you have selected **AzureML Nursery** as your subscription (click *Resource Groups* and then select the subscription). Then look for your **Machine Learning Workspace** (it has your *alias* as the name). It has a link to your storage. Click on the storage link. It will take you to a page where you can see [Blobs](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction), [Files](https://docs.microsoft.com/en-us/azure/storage/files/storage-files-introduction), [Tables](https://docs.microsoft.com/en-us/azure/storage/tables/table-storage-overview), and [Queues](https://docs.microsoft.com/en-us/azure/storage/queues/storage-queues-introduction). We have just uploaded a file to the Blob storage and another one to the File storage. You should be able to see both of these files in their respective locations. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute Targets\n", + "A compute target specifies where to execute your program such as a remote Docker on a VM, or a cluster. A compute target needs to be addressable and accessible by you.\n", + "\n", + "**You need at least one compute target to send your payload to. We are planning to use Azure Machine Learning Compute exclusively for this tutorial for all steps. However in some cases you may require multiple compute targets as some steps may run in one compute target like Azure Machine Learning Compute, and some other steps in the same pipeline could run in a different compute target.**\n", + "\n", + "*The example belows show creating/retrieving/attaching to an Azure Machine Learning Compute instance.*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### List of Compute Targets on the workspace" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cts = ws.compute_targets\n", + "for ct in cts:\n", + " print(ct)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Retrieve or create a Azure Machine Learning compute\n", + "Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's create a new Azure Machine Learning Compute in the current workspace, if it doesn't already exist. We will then run the training script on this compute target.\n", + "\n", + "If we could not find the compute with the given name in the previous cell, then we will create a new compute here. We will create an Azure Machine Learning Compute containing **STANDARD_D2_V2 CPU VMs**. This process is broken down into the following steps:\n", + "\n", + "1. Create the configuration\n", + "2. Create the Azure Machine Learning compute\n", + "\n", + "**This process will take about 3 minutes and is providing only sparse output in the process. Please make sure to wait until the call returns before moving to the next cell.**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "aml_compute_target = \"aml-compute\"\n", + "try:\n", + " aml_compute = AmlCompute(ws, aml_compute_target)\n", + " print(\"found existing compute target.\")\n", + "except:\n", + " print(\"creating new compute target\")\n", + " \n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n", + " min_nodes = 1, \n", + " max_nodes = 4) \n", + " aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n", + " aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", + " \n", + "print(\"Azure Machine Learning Compute attached\")\n", + "# For a more detailed view of current Azure Machine Learning Compute status, use the 'status' property \n", + "print(aml_compute.status.serialize())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Wait for this call to finish before proceeding (you will see the asterisk turning to a number).**\n", + "\n", + "Now that you have created the compute target, let's see what the workspace's compute_targets() function returns. You should now see one entry named 'amlcompute' of type AmlCompute." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Now that we have completed learning the basics of Azure Machine Learning (AML), let's go ahead and start understanding the Pipeline concepts.**" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating a Step in a Pipeline\n", + "A Step is a unit of execution. Step typically needs a target of execution (compute target), a script to execute, and may require script arguments and inputs, and can produce outputs. The step also could take a number of other parameters. Azure Machine Learning Pipelines provides the following built-in Steps:\n", + "\n", + "- [**PythonScriptStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py): Add a step to run a Python script in a Pipeline.\n", + "- [**AdlaStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py): Adds a step to run U-SQL script using Azure Data Lake Analytics.\n", + "- [**DataTransferStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.data_transfer_step.datatransferstep?view=azure-ml-py): Transfers data between Azure Blob and Data Lake accounts.\n", + "- [**DatabricksStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py): Adds a DataBricks notebook as a step in a Pipeline.\n", + "- [**HyperDriveStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.hyper_drive_step.hyperdrivestep?view=azure-ml-py): Creates a Hyper Drive step for Hyper Parameter Tuning in a Pipeline.\n", + "\n", + "The following code will create a PythonScriptStep to be executed in the Azure Machine Learning Compute we created above using train.py, one of the files already made available in the project folder.\n", + "\n", + "A **PythonScriptStep** is a basic, built-in step to run a Python Script on a compute target. It takes a script name and optionally other parameters like arguments for the script, compute target, inputs and outputs. If no compute target is specified, default compute target for the workspace is used." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Uses default values for PythonScriptStep construct.\n", + "\n", + "# Syntax\n", + "# PythonScriptStep(\n", + "# script_name, \n", + "# name=None, \n", + "# arguments=None, \n", + "# compute_target=None, \n", + "# runconfig=None, \n", + "# inputs=None, \n", + "# outputs=None, \n", + "# params=None, \n", + "# source_directory=None, \n", + "# allow_reuse=True, \n", + "# version=None, \n", + "# hash_paths=None)\n", + "# This returns a Step\n", + "step1 = PythonScriptStep(name=\"train_step\",\n", + " script_name=\"train.py\", \n", + " compute_target=aml_compute, \n", + " source_directory=project_folder,\n", + " allow_reuse=False)\n", + "print(\"Step1 created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Note:** In the above call to PythonScriptStep(), the flag *allow_reuse* determines whether the step should reuse previous results when run with the same settings/inputs. This flag's default value is *True*; the default is set to *True* because, when inputs and parameters have not changed, we typically do not want to re-run a given pipeline step. \n", + "\n", + "If *allow_reuse* is set to *False*, a new run will always be generated for this step during pipeline execution. The *allow_reuse* flag can come in handy in situations where you do *not* want to re-run a pipeline step." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Running a few steps in parallel\n", + "Here we are looking at a simple scenario where we are running a few steps (all involving PythonScriptStep) in parallel. Running nodes in **parallel** is the default behavior for steps in a pipeline.\n", + "\n", + "We already have one step defined earlier. Let's define few more steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# All steps use files already available in the project_folder\n", + "# All steps use the same Azure Machine Learning compute target as well\n", + "step2 = PythonScriptStep(name=\"compare_step\",\n", + " script_name=\"compare.py\", \n", + " compute_target=aml_compute, \n", + " source_directory=project_folder)\n", + "\n", + "step3 = PythonScriptStep(name=\"extract_step\",\n", + " script_name=\"extract.py\", \n", + " compute_target=aml_compute, \n", + " source_directory=project_folder)\n", + "\n", + "# list of steps to run\n", + "steps = [step1, step2, step3]\n", + "print(\"Step lists created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Build the pipeline\n", + "Once we have the steps (or steps collection), we can build the [pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py). By deafult, all these steps will run in **parallel** once we submit the pipeline for run.\n", + "\n", + "A pipeline is created with a list of steps and a workspace. Submit a pipeline using [submit](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment%28class%29?view=azure-ml-py#submit). When submit is called, a [PipelineRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinerun?view=azure-ml-py) is created which in turn creates [StepRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.steprun?view=azure-ml-py) objects for each step in the workflow." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Syntax\n", + "# Pipeline(workspace, \n", + "# steps, \n", + "# description=None, \n", + "# default_datastore_name=None, \n", + "# default_source_directory=None, \n", + "# resolve_closure=True, \n", + "# _workflow_provider=None, \n", + "# _service_endpoint=None)\n", + "\n", + "pipeline1 = Pipeline(workspace=ws, steps=steps)\n", + "print (\"Pipeline is built\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Validate the pipeline\n", + "You have the option to [validate](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#validate) the pipeline prior to submitting for run. The platform runs validation steps such as checking for circular dependencies and parameter checks etc. even if you do not explicitly call validate method." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline1.validate()\n", + "print(\"Pipeline validation complete\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline\n", + "[Submitting](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#submit) the pipeline involves creating an [Experiment](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment?view=azure-ml-py) object and providing the built pipeline for submission. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Submit syntax\n", + "# submit(experiment_name, \n", + "# pipeline_parameters=None, \n", + "# continue_on_node_failure=False, \n", + "# regenerate_outputs=False)\n", + "\n", + "pipeline_run1 = Experiment(ws, 'Hello_World1').submit(pipeline1, regenerate_outputs=True)\n", + "print(\"Pipeline is submitted for execution\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Note:** If regenerate_outputs is set to True, a new submit will always force generation of all step outputs, and disallow data reuse for any step of this run. Once this run is complete, however, subsequent runs may reuse the results of this run.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Examine the pipeline run\n", + "\n", + "#### Use RunDetails Widget\n", + "We are going to use the RunDetails widget to examine the run of the pipeline. You can click each row below to get more details on the step runs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "RunDetails(pipeline_run1).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Use Pipeline SDK objects\n", + "You can cycle through the node_run objects and examine job logs, stdout, and stderr of each of the steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step_runs = pipeline_run1.get_children()\n", + "for step_run in step_runs:\n", + " status = step_run.get_status()\n", + " print('Script:', step_run.name, 'status:', status)\n", + " \n", + " # Change this if you want to see details even if the Step has succeeded.\n", + " if status == \"Failed\":\n", + " joblog = step_run.get_job_log()\n", + " print('job log:', joblog)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Get additonal run details\n", + "If you wait until the pipeline_run is finished, you may be able to get additional details on the run. **Since this is a blocking call, the following code is commented out.**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#pipeline_run1.wait_for_completion()\n", + "#for step_run in pipeline_run1.get_children():\n", + "# print(\"{}: {}\".format(step_run.name, step_run.get_metrics()))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Running a few steps in sequence\n", + "Now let's see how we run a few steps in sequence. We already have three steps defined earlier. Let's *reuse* those steps for this part.\n", + "\n", + "We will reuse step1, step2, step3, but build the pipeline in such a way that we chain step3 after step2 and step2 after step1. Note that there is no explicit data dependency between these steps, but still steps can be made dependent by using the [run_after](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.builder.pipelinestep?view=azure-ml-py#run-after) construct." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step2.run_after(step1)\n", + "step3.run_after(step2)\n", + "\n", + "# Try a loop\n", + "#step2.run_after(step3)\n", + "\n", + "# Now, construct the pipeline using the steps.\n", + "\n", + "# We can specify the \"final step\" in the chain, \n", + "# Pipeline will take care of \"transitive closure\" and \n", + "# figure out the implicit or explicit dependencies\n", + "# https://www.geeksforgeeks.org/transitive-closure-of-a-graph/\n", + "pipeline2 = Pipeline(workspace=ws, steps=[step3])\n", + "print (\"Pipeline is built\")\n", + "\n", + "pipeline2.validate()\n", + "print(\"Simple validation complete\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_run2 = Experiment(ws, 'Hello_World2').submit(pipeline2)\n", + "print(\"Pipeline is submitted for execution\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "RunDetails(pipeline_run2).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Next: Pipelines with data dependency\n", + "The next [notebook](./aml-pipelines-with-data-dependency-steps.ipynb) demostrates how to construct a pipeline with data dependency." + ] + } + ], + "metadata": { + "authors": [ + { + "name": "diray" + } + ], + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipeline/aml-pipelines-publish-and-run-using-rest-endpoint.ipynb b/pipeline/aml-pipelines-publish-and-run-using-rest-endpoint.ipynb new file mode 100644 index 000000000..b32381e8b --- /dev/null +++ b/pipeline/aml-pipelines-publish-and-run-using-rest-endpoint.ipynb @@ -0,0 +1,358 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved. \n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# How to Publish a Pipeline and Invoke the REST endpoint\n", + "In this notebook, we will see how we can publish a pipeline and then invoke the REST endpoint." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites and Azure Machine Learning Basics\n", + "Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n", + "\n", + "### Initialization Steps" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import azureml.core\n", + "from azureml.core import Workspace, Run, Experiment, Datastore\n", + "from azureml.core.compute import AmlCompute\n", + "from azureml.core.compute import ComputeTarget\n", + "from azureml.core.compute import DataFactoryCompute\n", + "from azureml.widgets import RunDetails\n", + "\n", + "# Check core SDK version number\n", + "print(\"SDK version:\", azureml.core.VERSION)\n", + "\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.pipeline.core import Pipeline, PipelineData, StepSequence\n", + "from azureml.pipeline.steps import PythonScriptStep\n", + "from azureml.pipeline.steps import DataTransferStep\n", + "from azureml.pipeline.core import PublishedPipeline\n", + "from azureml.pipeline.core.graph import PipelineParameter\n", + "\n", + "print(\"Pipeline SDK-specific imports completed\")\n", + "\n", + "ws = Workspace.from_config()\n", + "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')\n", + "\n", + "# Default datastore (Azure file storage)\n", + "def_file_store = ws.get_default_datastore() \n", + "print(\"Default datastore's name: {}\".format(def_file_store.name))\n", + "\n", + "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n", + "print(\"Blobstore's name: {}\".format(def_blob_store.name))\n", + "\n", + "# project folder\n", + "project_folder = '.'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute Targets\n", + "#### Retrieve an already attached Azure Machine Learning Compute" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "aml_compute_target = \"aml-compute\"\n", + "try:\n", + " aml_compute = AmlCompute(ws, aml_compute_target)\n", + " print(\"found existing compute target.\")\n", + "except:\n", + " print(\"creating new compute target\")\n", + " \n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n", + " min_nodes = 1, \n", + " max_nodes = 4) \n", + " aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n", + " aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", + " \n", + "print(aml_compute.status.serialize())\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Building Pipeline Steps with Inputs and Outputs\n", + "As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Reference the data uploaded to blob storage using DataReference\n", + "# Assign the datasource to blob_input_data variable\n", + "blob_input_data = DataReference(\n", + " datastore=def_blob_store,\n", + " data_reference_name=\"test_data\",\n", + " path_on_datastore=\"20newsgroups/20news.pkl\")\n", + "print(\"DataReference object created\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define intermediate data using PipelineData\n", + "processed_data1 = PipelineData(\"processed_data1\",datastore=def_blob_store)\n", + "print(\"PipelineData object created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define a Step that consumes a datasource and produces intermediate data.\n", + "In this step, we define a step that consumes a datasource and produces intermediate data.\n", + "\n", + "**Open `train.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# trainStep consumes the datasource (Datareference) in the previous step\n", + "# and produces processed_data1\n", + "trainStep = PythonScriptStep(\n", + " script_name=\"train.py\", \n", + " arguments=[\"--input_data\", blob_input_data, \"--output_train\", processed_data1],\n", + " inputs=[blob_input_data],\n", + " outputs=[processed_data1],\n", + " compute_target=aml_compute, \n", + " source_directory=project_folder\n", + ")\n", + "print(\"trainStep created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define a Step that consumes intermediate data and produces intermediate data\n", + "In this step, we define a step that consumes an intermediate data and produces intermediate data.\n", + "\n", + "**Open `extract.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# extractStep to use the intermediate data produced by step4\n", + "# This step also produces an output processed_data2\n", + "processed_data2 = PipelineData(\"processed_data2\", datastore=def_blob_store)\n", + "\n", + "extractStep = PythonScriptStep(\n", + " script_name=\"extract.py\",\n", + " arguments=[\"--input_extract\", processed_data1, \"--output_extract\", processed_data2],\n", + " inputs=[processed_data1],\n", + " outputs=[processed_data2],\n", + " compute_target=aml_compute, \n", + " source_directory=project_folder)\n", + "print(\"extractStep created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define a Step that consumes multiple intermediate data and produces intermediate data\n", + "In this step, we define a step that consumes multiple intermediate data and produces intermediate data." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### PipelineParameter" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This step also has a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.graph.pipelineparameter?view=azure-ml-py) argument that help with calling the REST endpoint of the published pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# We will use this later in publishing pipeline\n", + "pipeline_param = PipelineParameter(name=\"pipeline_arg\", default_value=10)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Open `compare.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Now define step6 that takes two inputs (both intermediate data), and produce an output\n", + "processed_data3 = PipelineData(\"processed_data3\", datastore=def_blob_store)\n", + "\n", + "\n", + "\n", + "compareStep = PythonScriptStep(\n", + " script_name=\"compare.py\",\n", + " arguments=[\"--compare_data1\", processed_data1, \"--compare_data2\", processed_data2, \"--output_compare\", processed_data3, \"--pipeline_param\", pipeline_param],\n", + " inputs=[processed_data1, processed_data2],\n", + " outputs=[processed_data3], \n", + " compute_target=aml_compute, \n", + " source_directory=project_folder)\n", + "print(\"compareStep created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Build the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline1 = Pipeline(workspace=ws, steps=[compareStep])\n", + "print (\"Pipeline is built\")\n", + "\n", + "pipeline1.validate()\n", + "print(\"Simple validation complete\") " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Publish the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "published_pipeline1 = pipeline1.publish(name=\"My_New_Pipeline\", description=\"My Published Pipeline Description\")\n", + "print(published_pipeline1.id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run published pipeline using its REST endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.authentication import AzureCliAuthentication\n", + "import requests\n", + "\n", + "cli_auth = AzureCliAuthentication()\n", + "aad_token = cli_auth.get_authentication_header()\n", + "\n", + "rest_endpoint1 = published_pipeline1.endpoint\n", + "\n", + "print(rest_endpoint1)\n", + "\n", + "# specify the param when running the pipeline\n", + "response = requests.post(rest_endpoint1, \n", + " headers=aad_token, \n", + " json={\"ExperimentName\": \"My_Pipeline1\",\n", + " \"RunSource\": \"SDK\",\n", + " \"ParameterAssignments\": {\"pipeline_arg\": 45}})\n", + "run_id = response.json()[\"Id\"]\n", + "\n", + "print(run_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Next: Data Transfer\n", + "The next [notebook](./aml-pipelines-data-transfer.ipynb) will showcase data transfer steps between different types of data stores." + ] + } + ], + "metadata": { + "authors": [ + { + "name": "diray" + } + ], + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipeline/aml-pipelines-use-adla-as-compute-target.ipynb b/pipeline/aml-pipelines-use-adla-as-compute-target.ipynb new file mode 100644 index 000000000..3740f027f --- /dev/null +++ b/pipeline/aml-pipelines-use-adla-as-compute-target.ipynb @@ -0,0 +1,348 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved. \n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# AML Pipeline with AdlaStep\n", + "This notebook is used to demonstrate the use of AdlaStep in AML Pipeline." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## AML and Pipeline SDK-specific imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import azureml.core\n", + "from azureml.core.compute import ComputeTarget, DatabricksCompute\n", + "from azureml.exceptions import ComputeTargetException\n", + "from azureml.core import Workspace, Run, Experiment\n", + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "from azureml.pipeline.steps import AdlaStep\n", + "from azureml.core.datastore import Datastore\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.core import attach_legacy_compute_target\n", + "\n", + "# Check core SDK version number\n", + "print(\"SDK version:\", azureml.core.VERSION)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize Workspace\n", + "\n", + "Initialize a workspace object from persisted configuration. Make sure the config file is present at .\\config.json" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "create workspace" + ] + }, + "outputs": [], + "source": [ + "ws = Workspace.from_config()\n", + "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "script_folder = '.'\n", + "experiment_name = \"adla_101_experiment\"\n", + "ws._initialize_folder(experiment_name=experiment_name, directory=script_folder)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Register Datastore" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# un-comment the following and replace the strings with the \n", + "# correct values for your ADLS datastore\n", + "\n", + "# workspace=\"\"\n", + "# subscription_id = \"\"\n", + "# resource_group = \"\"\n", + "# store_name = \"\"\n", + "# tenant_id = \"\"\n", + "# client_id = \"\"\n", + "# client_secret = \"\"\n", + "\n", + "\n", + "try:\n", + " adls_datastore = Datastore.get(ws, datastore_name)\n", + " print(\"found datastore with name: %s\" % datastore_name)\n", + "except:\n", + " adls_datastore = Datastore.register_azure_data_lake(\n", + " workspace=ws,\n", + " datastore_name=datastore_name,\n", + " subscription_id=subscription_id, # subscription id of ADLS account\n", + " resource_group=resource_group, # resource group of ADLS account\n", + " store_name=store_name, # ADLS account name\n", + " tenant_id=tenant_id, # tenant id of service principal\n", + " client_id=client_id, # client id of service principal\n", + " client_secret=client_secret) # the secret of service principal\n", + " print(\"registered datastore with name: %s\" % datastore_name)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create DataReferences and PipelineData\n", + "\n", + "In the code cell below, replace datastorename with your default datastore name. Copy the file `testdata.txt` (located in the pipeline folder that this notebook is in) to the path on the datastore." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "datastorename = \"TestAdlsDatastore\"\n", + "\n", + "adls_datastore = Datastore(workspace=ws, name=datastorename)\n", + "script_input = DataReference(\n", + " datastore=adls_datastore,\n", + " data_reference_name=\"script_input\",\n", + " path_on_datastore=\"testdata/testdata.txt\")\n", + "\n", + "script_output = PipelineData(\"script_output\", datastore=adls_datastore)\n", + "\n", + "print(\"Created Pipeline Data\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup Data Lake Account\n", + "\n", + "ADLA can only use data that is located in the default data store associated with that ADLA account. Through Azure portal, check the name of the default data store corresponding to the ADLA account you are using below. Replace the value associated with `adla_compute_name` in the code cell below accordingly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "adla_compute_name = 'testadl' # Replace this with your default compute\n", + "\n", + "from azureml.core.compute import ComputeTarget, AdlaCompute\n", + "\n", + "def get_or_create_adla_compute(workspace, compute_name):\n", + " try:\n", + " return AdlaCompute(workspace, compute_name)\n", + " except ComputeTargetException as e:\n", + " if 'ComputeTargetNotFound' in e.message:\n", + " print('adla compute not found, creating...')\n", + " provisioning_config = AdlaCompute.provisioning_configuration()\n", + " adla_compute = ComputeTarget.create(workspace, compute_name, provisioning_config)\n", + " adla_compute.wait_for_completion()\n", + " return adla_compute\n", + " else:\n", + " raise e\n", + " \n", + "adla_compute = get_or_create_adla_compute(ws, adla_compute_name)\n", + "\n", + "# CLI:\n", + "# Create: az ml computetarget setup adla -n \n", + "# BYOC: az ml computetarget attach adla -n -i " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Once the above code cell completes, run the below to check your ADLA compute status:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"ADLA compute state:{}\".format(adla_compute.provisioning_state))\n", + "print(\"ADLA compute state:{}\".format(adla_compute.provisioning_errors))\n", + "print(\"Using ADLA compute:{}\".format(adla_compute.cluster_resource_id))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create an AdlaStep" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**AdlaStep** is used to run U-SQL script using Azure Data Lake Analytics.\n", + "\n", + "- **name:** Name of module\n", + "- **script_name:** name of U-SQL script\n", + "- **inputs:** List of input port bindings\n", + "- **outputs:** List of output port bindings\n", + "- **adla_compute:** the ADLA compute to use for this job\n", + "- **params:** Dictionary of name-value pairs to pass to U-SQL job *(optional)*\n", + "- **degree_of_parallelism:** the degree of parallelism to use for this job *(optional)*\n", + "- **priority:** the priority value to use for the current job *(optional)*\n", + "- **runtime_version:** the runtime version of the Data Lake Analytics engine *(optional)*\n", + "- **root_folder:** folder that contains the script, assemblies etc. *(optional)*\n", + "- **hash_paths:** list of paths to hash to detect a change (script file is always hashed) *(optional)*" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "adla_step = AdlaStep(\n", + " name='adla_script_step',\n", + " script_name='test_adla_script.usql',\n", + " inputs=[script_input],\n", + " outputs=[script_output],\n", + " compute_target=adla_compute)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build and Submit the Experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline = Pipeline(\n", + " description=\"adla_102\",\n", + " workspace=ws, \n", + " steps=[adla_step],\n", + " default_source_directory=script_folder)\n", + "\n", + "pipeline_run = Experiment(workspace, experiment_name).submit(pipeline)\n", + "pipeline_run.wait_for_completion()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### View Run Details" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.widgets import RunDetails\n", + "RunDetails(pipeline_run).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Examine the run\n", + "You can cycle through the node_run objects and examine job logs, stdout, and stderr of each of the steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step_runs = pipeline_run.get_children()\n", + "for step_run in step_runs:\n", + " status = step_run.get_status()\n", + " print('node', step_run.name, 'status:', status)\n", + " if status == \"Failed\":\n", + " joblog = step_run.get_job_log()\n", + " print('job log:', joblog)\n", + " stdout_log = step_run.get_stdout_log()\n", + " print('stdout log:', stdout_log)\n", + " stderr_log = step_run.get_stderr_log()\n", + " print('stderr log:', stderr_log)\n", + " with open(\"logs-\" + step_run.name + \".txt\", \"w\") as f:\n", + " f.write(joblog)\n", + " print(\"Job log written to logs-\"+ step_run.name + \".txt\")\n", + " if status == \"Finished\":\n", + " stdout_log = step_run.get_stdout_log()\n", + " print('stdout log:', stdout_log)" + ] + } + ], + "metadata": { + "authors": [ + { + "name": "diray" + } + ], + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipeline/aml-pipelines-use-databricks-as-compute-target.ipynb b/pipeline/aml-pipelines-use-databricks-as-compute-target.ipynb new file mode 100644 index 000000000..3c792fdc1 --- /dev/null +++ b/pipeline/aml-pipelines-use-databricks-as-compute-target.ipynb @@ -0,0 +1,651 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved. \n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using Databricks as a Compute Target from Azure Machine Learning Pipeline\n", + "To use Databricks as a compute target from [Azure Machine Learning Pipeline](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines), a [DatabricksStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py) is used. This notebook demonstrates the use of DatabricksStep in Azure Machine Learning Pipeline.\n", + "\n", + "The notebook will show:\n", + "1. Running an arbitrary Databricks notebook that the customer has in Databricks workspace\n", + "2. Running an arbitrary Python script that the customer has in DBFS\n", + "3. Running an arbitrary Python script that is available on local computer (will upload to DBFS, and then run in Databricks) \n", + "4. Running a JAR job that the customer has in DBFS.\n", + "\n", + "## Before you begin:\n", + "\n", + "1. **Create an Azure Databricks workspace** in the same subscription where you have your Azure Machine Learning workspace. You will need details of this workspace later on to define DatabricksStep. [Click here](https://ms.portal.azure.com/#blade/HubsExtension/Resources/resourceType/Microsoft.Databricks%2Fworkspaces) for more information.\n", + "2. **Create PAT (access token)**: Manually create a Databricks access token at the Azure Databricks portal. See [this](https://docs.databricks.com/api/latest/authentication.html#generate-a-token) for more information.\n", + "3. **Add demo notebook to ADB**: This notebook has a sample you can use as is. Launch Azure Databricks attached to your Azure Machine Learning workspace and add a new notebook. \n", + "4. **Create/attach a Blob storage** for use from ADB" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Add demo notebook to ADB Workspace\n", + "Copy and paste the below code to create a new notebook in your ADB workspace." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```python\n", + "# direct access\n", + "dbutils.widgets.get(\"myparam\")\n", + "p = getArgument(\"myparam\")\n", + "print (\"Param -\\'myparam':\")\n", + "print (p)\n", + "\n", + "dbutils.widgets.get(\"input\")\n", + "i = getArgument(\"input\")\n", + "print (\"Param -\\'input':\")\n", + "print (i)\n", + "\n", + "dbutils.widgets.get(\"output\")\n", + "o = getArgument(\"output\")\n", + "print (\"Param -\\'output':\")\n", + "print (o)\n", + "\n", + "n = i + \"/testdata.txt\"\n", + "df = spark.read.csv(n)\n", + "\n", + "display (df)\n", + "\n", + "data = [('value1', 'value2')]\n", + "df2 = spark.createDataFrame(data)\n", + "\n", + "z = o + \"/output.txt\"\n", + "df2.write.csv(z)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Azure Machine Learning and Pipeline SDK-specific imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import azureml.core\n", + "from azureml.core.runconfig import JarLibrary\n", + "from azureml.core.compute import ComputeTarget, DatabricksCompute\n", + "from azureml.exceptions import ComputeTargetException\n", + "from azureml.core import Workspace, Run, Experiment\n", + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "from azureml.pipeline.steps import DatabricksStep\n", + "from azureml.core.datastore import Datastore\n", + "from azureml.data.data_reference import DataReference\n", + "\n", + "# Check core SDK version number\n", + "print(\"SDK version:\", azureml.core.VERSION)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize Workspace\n", + "\n", + "Initialize a workspace object from persisted configuration. Make sure the config file is present at .\\config.json" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ws = Workspace.from_config()\n", + "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Attach Databricks compute target\n", + "Next, you need to add your Databricks workspace to Azure Machine Learning as a compute target and give it a name. You will use this name to refer to your Databricks workspace compute target inside Azure Machine Learning.\n", + "\n", + "- **Resource Group** - The resource group name of your Azure Machine Learning workspace\n", + "- **Databricks Workspace Name** - The workspace name of your Azure Databricks workspace\n", + "- **Databricks Access Token** - The access token you created in ADB" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Replace with your account info before running.\n", + "\n", + "# db_compute_name = \"\"\n", + "# aml_resource_group = \"\"\n", + "# db_workspace_name = \"\"\n", + "# access_token = \"\"\n", + "\n", + "try:\n", + " databricks_compute = ComputeTarget(workspace=ws, name=db_compute_name)\n", + " print('Compute target {} already exists'.format(db_compute_name))\n", + "except ComputeTargetException:\n", + " print('compute not found')\n", + " print('databricks_compute_name {}'.format(db_compute_name))\n", + " print('databricks_resource_id {}'.format(db_workspace_name))\n", + " print('databricks_access_token {}'.format(access_token))\n", + "\n", + " config = DatabricksCompute.attach_configuration(aml_resource_group, db_workspace_name, access_token)\n", + " ComputeTarget.attach(ws, db_compute_name, config)\n", + " databricks_compute.wait_for_completion(True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Connections with Inputs and Outputs\n", + "The DatabricksStep supports Azure Bloband ADLS for inputs and outputs. You also will need to define a [Secrets](https://docs.azuredatabricks.net/user-guide/secrets/index.html) scope to enable authentication to external data sources such as Blob and ADLS from Databricks.\n", + "\n", + "- Databricks documentation on [Azure Blob](https://docs.azuredatabricks.net/spark/latest/data-sources/azure/azure-storage.html)\n", + "- Databricks documentation on [ADLS](https://docs.databricks.com/spark/latest/data-sources/azure/azure-datalake.html)\n", + "\n", + "### Type of Data Access\n", + "Databricks allows to interact with Azure Blob and ADLS in two ways.\n", + "- **Direct Access**: Databricks allows you to interact with Azure Blob or ADLS URIs directly. The input or output URIs will be mapped to a Databricks widget param in the Databricks notebook.\n", + "- **Mouting**: You will be supplied with additional parameters and secrets that will enable you to mount your ADLS or Azure Blob input or output location in your Databricks notebook." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Direct Access: Python sample code\n", + "If you have a data reference named \"input\" it will represent the URI of the input and you can access it directly in the Databricks python notebook like so:" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```python\n", + "dbutils.widgets.get(\"input\")\n", + "y = getArgument(\"input\")\n", + "df = spark.read.csv(y)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Mounting: Python sample code for Azure Blob\n", + "Given an Azure Blob data reference named \"input\" the following widget params will be made available in the Databricks notebook:" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```python\n", + "# This contains the input URI\n", + "dbutils.widgets.get(\"input\")\n", + "myinput_uri = getArgument(\"input\")\n", + "\n", + "# How to get the input datastore name inside ADB notebook\n", + "# This contains the name of a Databricks secret (in the predefined \"amlscope\" secret scope) \n", + "# that contians an access key or sas for the Azure Blob input (this name is obtained by appending \n", + "# the name of the input with \"_blob_secretname\". \n", + "dbutils.widgets.get(\"input_blob_secretname\") \n", + "myinput_blob_secretname = getArgument(\"input_blob_secretname\")\n", + "\n", + "# This contains the required configuration for mounting\n", + "dbutils.widgets.get(\"input_blob_config\")\n", + "myinput_blob_config = getArgument(\"input_blob_config\")\n", + "\n", + "# Usage\n", + "dbutils.fs.mount(\n", + " source = myinput_uri,\n", + " mount_point = \"/mnt/input\",\n", + " extra_configs = {myinput_blob_config:dbutils.secrets.get(scope = \"amlscope\", key = myinput_blob_secretname)})\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Mounting: Python sample code for ADLS\n", + "Given an ADLS data reference named \"input\" the following widget params will be made available in the Databricks notebook:" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```python\n", + "# This contains the input URI\n", + "dbutils.widgets.get(\"input\") \n", + "myinput_uri = getArgument(\"input\")\n", + "\n", + "# This contains the client id for the service principal \n", + "# that has access to the adls input\n", + "dbutils.widgets.get(\"input_adls_clientid\") \n", + "myinput_adls_clientid = getArgument(\"input_adls_clientid\")\n", + "\n", + "# This contains the name of a Databricks secret (in the predefined \"amlscope\" secret scope) \n", + "# that contains the secret for the above mentioned service principal\n", + "dbutils.widgets.get(\"input_adls_secretname\") \n", + "myinput_adls_secretname = getArgument(\"input_adls_secretname\")\n", + "\n", + "# This contains the refresh url for the mounting configs\n", + "dbutils.widgets.get(\"input_adls_refresh_url\") \n", + "myinput_adls_refresh_url = getArgument(\"input_adls_refresh_url\")\n", + "\n", + "# Usage \n", + "configs = {\"dfs.adls.oauth2.access.token.provider.type\": \"ClientCredential\",\n", + " \"dfs.adls.oauth2.client.id\": myinput_adls_clientid,\n", + " \"dfs.adls.oauth2.credential\": dbutils.secrets.get(scope = \"amlscope\", key =myinput_adls_secretname),\n", + " \"dfs.adls.oauth2.refresh.url\": myinput_adls_refresh_url}\n", + "\n", + "dbutils.fs.mount(\n", + " source = myinput_uri,\n", + " mount_point = \"/mnt/output\",\n", + " extra_configs = configs)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use Databricks from Azure Machine Learning Pipeline\n", + "To use Databricks as a compute target from Azure Machine Learning Pipeline, a DatabricksStep is used. Let's define a datasource (via DataReference) and intermediate data (via PipelineData) to be used in DatabricksStep." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Use the default blob storage\n", + "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n", + "print('Datastore {} will be used'.format(def_blob_store.name))\n", + "\n", + "# We are uploading a sample file in the local directory to be used as a datasource\n", + "def_blob_store.upload_files([\"./testdata.txt\"], target_path=\"dbtest\", overwrite=False)\n", + "\n", + "step_1_input = DataReference(datastore=def_blob_store, path_on_datastore=\"dbtest\",\n", + " data_reference_name=\"input\")\n", + "\n", + "step_1_output = PipelineData(\"output\", datastore=def_blob_store)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add a DatabricksStep\n", + "Adds a Databricks notebook as a step in a Pipeline.\n", + "- ***name:** Name of the Module\n", + "- **inputs:** List of input connections for data consumed by this step. Fetch this inside the notebook using dbutils.widgets.get(\"input\")\n", + "- **outputs:** List of output port definitions for outputs produced by this step. Fetch this inside the notebook using dbutils.widgets.get(\"output\")\n", + "- **spark_version:** Version of spark for the databricks run cluster. default value: 4.0.x-scala2.11\n", + "- **node_type:** Azure vm node types for the databricks run cluster. default value: Standard_D3_v2\n", + "- **num_workers:** Number of workers for the databricks run cluster\n", + "- **autoscale:** The autoscale configuration for the databricks run cluster\n", + "- **spark_env_variables:** Spark environment variables for the databricks run cluster (dictionary of {str:str}). default value: {'PYSPARK_PYTHON': '/databricks/python3/bin/python3'}\n", + "- ***notebook_path:** Path to the notebook in the databricks instance.\n", + "- **notebook_params:** Parameters for the databricks notebook (dictionary of {str:str}). Fetch this inside the notebook using dbutils.widgets.get(\"myparam\")\n", + "- **run_name:** Name in databricks for this run\n", + "- **timeout_seconds:** Timeout for the databricks run\n", + "- **maven_libraries:** maven libraries for the databricks run\n", + "- **pypi_libraries:** pypi libraries for the databricks run\n", + "- **egg_libraries:** egg libraries for the databricks run\n", + "- **jar_libraries:** jar libraries for the databricks run\n", + "- **rcran_libraries:** rcran libraries for the databricks run\n", + "- **databricks_compute:** Azure Databricks compute\n", + "- **databricks_compute_name:** Name of Azure Databricks compute\n", + "\n", + "\\* *denotes required fields* \n", + "*You must provide exactly one of num_workers or autoscale paramaters* \n", + "*You must provide exactly one of databricks_compute or databricks_compute_name parameters*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1. Running the demo notebook already added to the Databricks workspace\n", + "The commented out code in the below cell assumes that you have created a notebook called `demo_notebook` in Azure Databricks under your user folder so you can use `notebook_path = \"/Users/you@company.com/demo_notebook\"`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# notebook_path = \"/Users/you@company.com/demo_notebook\"\n", + "\n", + "dbNbStep = DatabricksStep(\n", + " name=\"DBNotebookInWS\",\n", + " inputs=[step_1_input],\n", + " outputs=[step_1_output],\n", + " num_workers=1,\n", + " notebook_path=notebook_path,\n", + " notebook_params={'myparam': 'testparam'},\n", + " run_name='DB_Notebook_demo',\n", + " compute_target=databricks_compute,\n", + " allow_reuse=False\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Build and submit the Experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "steps = [dbNbStep]\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\n", + "pipeline_run = Experiment(ws, 'DB_Notebook_demo').submit(pipeline)\n", + "pipeline_run.wait_for_completion()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### View Run Details" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.widgets import RunDetails\n", + "RunDetails(pipeline_run).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2. Running a Python script that is already added in DBFS\n", + "To run a Python script that is already uploaded to DBFS, follow the instructions below. You will first upload the Python script to DBFS using the [CLI](https://docs.azuredatabricks.net/user-guide/dbfs-databricks-file-system.html).\n", + "\n", + "The commented out code in the below cell assumes that you have uploaded `train-db-dbfs.py` to the root folder in DBFS. You can upload `train-db-dbfs.py` to the root folder in DBFS using this commandline so you can use `python_script_path = \"dbfs:/train-db-dbfs.py\"`:\n", + "\n", + "```\n", + "dbfs cp ./train-db-dbfs.py dbfs:/train-db-dbfs.py\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "python_script_path = \"dbfs:/train-db-dbfs.py\"\n", + "\n", + "dbPythonInDbfsStep = DatabricksStep(\n", + " name=\"DBPythonInDBFS\",\n", + " inputs=[step_1_input],\n", + " num_workers=1,\n", + " python_script_path=python_script_path,\n", + " python_script_params={'--input_data'},\n", + " run_name='DB_Python_demo',\n", + " compute_target=databricks_compute,\n", + " allow_reuse=False\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Build and submit the Experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "steps = [dbPythonInDbfsStep]\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\n", + "pipeline_run = Experiment(ws, 'DB_Python_demo').submit(pipeline)\n", + "pipeline_run.wait_for_completion()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### View Run Details" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.widgets import RunDetails\n", + "RunDetails(pipeline_run).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 3. Running a Python script in Databricks that currenlty is in local computer\n", + "To run a Python script that is currently in your local computer, follow the instructions below. \n", + "\n", + "The commented out code below code assumes that you have `train-db-local.py` in the `scripts` subdirectory under the current working directory.\n", + "\n", + "In this case, the Python script will be uploaded first to DBFS, and then the script will be run in Databricks." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "python_script_name = \"train-db-local.py\"\n", + "source_directory = \".\"\n", + "\n", + "dbPythonInLocalMachineStep = DatabricksStep(\n", + " name=\"DBPythonInLocalMachine\",\n", + " inputs=[step_1_input],\n", + " num_workers=1,\n", + " python_script_name=python_script_name,\n", + " source_directory=source_directory,\n", + " run_name='DB_Python_Local_demo',\n", + " compute_target=databricks_compute,\n", + " allow_reuse=False\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Build and submit the Experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "steps = [dbPythonInLocalMachineStep]\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\n", + "pipeline_run = Experiment(ws, 'DB_Python_Local_demo').submit(pipeline)\n", + "pipeline_run.wait_for_completion()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### View Run Details" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.widgets import RunDetails\n", + "RunDetails(pipeline_run).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 4. Running a JAR job that is alreay added in DBFS\n", + "To run a JAR job that is already uploaded to DBFS, follow the instructions below. You will first upload the JAR file to DBFS using the [CLI](https://docs.azuredatabricks.net/user-guide/dbfs-databricks-file-system.html).\n", + "\n", + "The commented out code in the below cell assumes that you have uploaded `train-db-dbfs.jar` to the root folder in DBFS. You can upload `train-db-dbfs.jar` to the root folder in DBFS using this commandline so you can use `jar_library_dbfs_path = \"dbfs:/train-db-dbfs.jar\"`:\n", + "\n", + "```\n", + "dbfs cp ./train-db-dbfs.jar dbfs:/train-db-dbfs.jar\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "main_jar_class_name = \"com.microsoft.aeva.Main\"\n", + "jar_library_dbfs_path = \"dbfs:/train-db-dbfs.jar\"\n", + "\n", + "dbJarInDbfsStep = DatabricksStep(\n", + " name=\"DBJarInDBFS\",\n", + " inputs=[step_1_input],\n", + " num_workers=1,\n", + " main_class_name=main_jar_class_name,\n", + " jar_params={'arg1', 'arg2'},\n", + " run_name='DB_JAR_demo',\n", + " jar_libraries=[JarLibrary(jar_library_dbfs_path)],\n", + " compute_target=databricks_compute,\n", + " allow_reuse=False\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Build and submit the Experiment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "steps = [dbJarInDbfsStep]\n", + "pipeline = Pipeline(workspace=ws, steps=steps)\n", + "pipeline_run = Experiment(ws, 'DB_JAR_demo').submit(pipeline)\n", + "pipeline_run.wait_for_completion()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### View Run Details" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.widgets import RunDetails\n", + "RunDetails(pipeline_run).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Next: ADLA as a Compute Target\n", + "To use ADLA as a compute target from Azure Machine Learning Pipeline, a AdlaStep is used. This [notebook](./aml-pipelines-use-adla-as-compute-target.ipynb) demonstrates the use of AdlaStep in Azure Machine Learning Pipeline." + ] + } + ], + "metadata": { + "authors": [ + { + "name": "diray" + } + ], + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipeline/aml-pipelines-with-data-dependency-steps.ipynb b/pipeline/aml-pipelines-with-data-dependency-steps.ipynb new file mode 100644 index 000000000..4cd7142c2 --- /dev/null +++ b/pipeline/aml-pipelines-with-data-dependency-steps.ipynb @@ -0,0 +1,409 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved. \n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Azure Machine Learning Pipelines with Data Dependency\n", + "In this notebook, we will see how we can build a pipeline with implicit data dependancy." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites and Azure Machine Learning Basics\n", + "Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n", + "\n", + "### Azure Machine Learning and Pipeline SDK-specific Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import azureml.core\n", + "from azureml.core import Workspace, Run, Experiment, Datastore\n", + "from azureml.core.compute import AmlCompute\n", + "from azureml.core.compute import ComputeTarget\n", + "from azureml.core.compute import DataFactoryCompute\n", + "from azureml.widgets import RunDetails\n", + "\n", + "# Check core SDK version number\n", + "print(\"SDK version:\", azureml.core.VERSION)\n", + "\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.pipeline.core import Pipeline, PipelineData, StepSequence\n", + "from azureml.pipeline.steps import PythonScriptStep\n", + "from azureml.pipeline.steps import DataTransferStep\n", + "from azureml.pipeline.core import PublishedPipeline\n", + "from azureml.pipeline.core.graph import PipelineParameter\n", + "\n", + "print(\"Pipeline SDK-specific imports completed\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Initialize Workspace\n", + "\n", + "Initialize a [workspace](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.workspace(class%29) object from persisted configuration." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "create workspace" + ] + }, + "outputs": [], + "source": [ + "ws = Workspace.from_config()\n", + "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')\n", + "\n", + "# Default datastore (Azure file storage)\n", + "def_file_store = ws.get_default_datastore() \n", + "print(\"Default datastore's name: {}\".format(def_file_store.name))\n", + "\n", + "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n", + "print(\"Blobstore's name: {}\".format(def_blob_store.name))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# project folder\n", + "project_folder = '.'\n", + " \n", + "print('Sample projects will be created in {}.'.format(project_folder))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Required data and script files for the the tutorial\n", + "Sample files required to finish this tutorial are already copied to the project folder specified above. Even though the .py provided in the samples don't have much \"ML work,\" as a data scientist, you will work on this extensively as part of your work. To complete this tutorial, the contents of these files are not very important. The one-line files are for demostration purpose only." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute Targets\n", + "See the list of Compute Targets on the workspace." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cts = ws.compute_targets\n", + "for ct in cts:\n", + " print(ct)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Retrieve or create a Aml compute\n", + "Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's create a new Aml Compute in the current workspace, if it doesn't already exist. We will then run the training script on this compute target." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "aml_compute_target = \"aml-compute\"\n", + "try:\n", + " aml_compute = AmlCompute(ws, aml_compute_target)\n", + " print(\"found existing compute target.\")\n", + "except:\n", + " print(\"creating new compute target\")\n", + " \n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n", + " min_nodes = 1, \n", + " max_nodes = 4) \n", + " aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n", + " aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", + " \n", + "print(\"Aml Compute attached\")\n", + "# For a more detailed view of current AmlCompute status, use the 'status' property \n", + "print(aml_compute.status.serialize())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Wait for this call to finish before proceeding (you will see the asterisk turning to a number).**\n", + "\n", + "Now that you have created the compute target, let's see what the workspace's compute_targets() function returns. You should now see one entry named 'amlcompute' of type AmlCompute." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Building Pipeline Steps with Inputs and Outputs\n", + "As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline.\n", + "\n", + "### Datasources\n", + "Datasource is represented by **[DataReference](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.data_reference.datareference?view=azure-ml-py)** object and points to data that lives in or is accessible from Datastore. DataReference could be a pointer to a file or a directory." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Reference the data uploaded to blob storage using DataReference\n", + "# Assign the datasource to blob_input_data variable\n", + "\n", + "# DataReference(datastore, \n", + "# data_reference_name=None, \n", + "# path_on_datastore=None, \n", + "# mode='mount', \n", + "# path_on_compute=None, \n", + "# overwrite=False)\n", + "\n", + "blob_input_data = DataReference(\n", + " datastore=def_blob_store,\n", + " data_reference_name=\"test_data\",\n", + " path_on_datastore=\"20newsgroups/20news.pkl\")\n", + "print(\"DataReference object created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Intermediate/Output Data\n", + "Intermediate data (or output of a Step) is represented by **[PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py)** object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.\n", + "\n", + "#### Constructing PipelineData\n", + "- **name:** [*Required*] Name of the data item within the pipeline graph\n", + "- **datastore_name:** Name of the Datastore to write this output to\n", + "- **output_name:** Name of the output\n", + "- **output_mode:** Specifies \"upload\" or \"mount\" modes for producing output (default: mount)\n", + "- **output_path_on_compute:** For \"upload\" mode, the path to which the module writes this output during execution\n", + "- **output_overwrite:** Flag to overwrite pre-existing data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define intermediate data using PipelineData\n", + "# Syntax\n", + "\n", + "# PipelineData(name, \n", + "# datastore=None, \n", + "# output_name=None, \n", + "# output_mode='mount', \n", + "# output_path_on_compute=None, \n", + "# output_overwrite=None, \n", + "# data_type=None, \n", + "# is_directory=None)\n", + "\n", + "# Naming the intermediate data as processed_data1 and assigning it to the variable processed_data1.\n", + "processed_data1 = PipelineData(\"processed_data1\",datastore=def_blob_store)\n", + "print(\"PipelineData object created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Pipelines steps using datasources and intermediate data\n", + "Machine learning pipelines have many steps and these steps could use or reuse datasources and intermediate data. Here's how we construct such a pipeline:" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define a Step that consumes a datasource and produces intermediate data.\n", + "In this step, we define a step that consumes a datasource and produces intermediate data.\n", + "\n", + "**Open `train.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# step4 consumes the datasource (Datareference) in the previous step\n", + "# and produces processed_data1\n", + "trainStep = PythonScriptStep(\n", + " script_name=\"train.py\", \n", + " arguments=[\"--input_data\", blob_input_data, \"--output_train\", processed_data1],\n", + " inputs=[blob_input_data],\n", + " outputs=[processed_data1],\n", + " compute_target=aml_compute, \n", + " source_directory=project_folder\n", + ")\n", + "print(\"trainStep created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define a Step that consumes intermediate data and produces intermediate data\n", + "In this step, we define a step that consumes an intermediate data and produces intermediate data.\n", + "\n", + "**Open `extract.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# step5 to use the intermediate data produced by step4\n", + "# This step also produces an output processed_data2\n", + "processed_data2 = PipelineData(\"processed_data2\", datastore=def_blob_store)\n", + "\n", + "extractStep = PythonScriptStep(\n", + " script_name=\"extract.py\",\n", + " arguments=[\"--input_extract\", processed_data1, \"--output_extract\", processed_data2],\n", + " inputs=[processed_data1],\n", + " outputs=[processed_data2],\n", + " compute_target=aml_compute, \n", + " source_directory=project_folder)\n", + "print(\"extractStep created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define a Step that consumes multiple intermediate data and produces intermediate data\n", + "In this step, we define a step that consumes multiple intermediate data and produces intermediate data.\n", + "\n", + "**Open `compare.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Now define step6 that takes two inputs (both intermediate data), and produce an output\n", + "processed_data3 = PipelineData(\"processed_data3\", datastore=def_blob_store)\n", + "\n", + "compareStep = PythonScriptStep(\n", + " script_name=\"compare.py\",\n", + " arguments=[\"--compare_data1\", processed_data1, \"--compare_data2\", processed_data2, \"--output_compare\", processed_data3],\n", + " inputs=[processed_data1, processed_data2],\n", + " outputs=[processed_data3], \n", + " compute_target=aml_compute, \n", + " source_directory=project_folder)\n", + "print(\"compareStep created\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Build the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline1 = Pipeline(workspace=ws, steps=[compareStep])\n", + "print (\"Pipeline is built\")\n", + "\n", + "pipeline1.validate()\n", + "print(\"Simple validation complete\") " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_run1 = Experiment(ws, 'Data_dependency').submit(pipeline1)\n", + "print(\"Pipeline is submitted for execution\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "RunDetails(pipeline_run1).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Next: Publishing the Pipeline and calling it from the REST endpoint\n", + "See this [notebook](./aml-pipelines-publish-and-run-using-rest-endpoint.ipynb) to understand how the pipeline is published and you can call the REST endpoint to run the pipeline." + ] + } + ], + "metadata": { + "authors": [ + { + "name": "diray" + } + ], + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipeline/batch_scoring.py b/pipeline/batch_scoring.py new file mode 100644 index 000000000..dfd135bd6 --- /dev/null +++ b/pipeline/batch_scoring.py @@ -0,0 +1,119 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. + +import os +import argparse +import datetime +import time +import tensorflow as tf +from math import ceil +import numpy as np +import shutil +from tensorflow.contrib.slim.python.slim.nets import inception_v3 +from azureml.core.model import Model + +slim = tf.contrib.slim + +parser = argparse.ArgumentParser(description="Start a tensorflow model serving") +parser.add_argument('--model_name', dest="model_name", required=True) +parser.add_argument('--label_dir', dest="label_dir", required=True) +parser.add_argument('--dataset_path', dest="dataset_path", required=True) +parser.add_argument('--output_dir', dest="output_dir", required=True) +parser.add_argument('--batch_size', dest="batch_size", type=int, required=True) + +args = parser.parse_args() + +image_size = 299 +num_channel = 3 + +# create output directory if it does not exist +os.makedirs(args.output_dir, exist_ok=True) + + +def get_class_label_dict(label_file): + label = [] + proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines() + for l in proto_as_ascii_lines: + label.append(l.rstrip()) + return label + + +class DataIterator: + def __init__(self, data_dir): + self.file_paths = [] + image_list = os.listdir(data_dir) + # total_size = len(image_list) + self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list] + + self.labels = [1 for file_name in self.file_paths] + + @property + def size(self): + return len(self.labels) + + def input_pipeline(self, batch_size): + images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string) + labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64) + input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False) + labels = input_queue[1] + images_content = tf.read_file(input_queue[0]) + + image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name="jpeg_reader") + float_caster = tf.cast(image_reader, tf.float32) + new_size = tf.constant([image_size, image_size], dtype=tf.int32) + images = tf.image.resize_images(float_caster, new_size) + images = tf.divide(tf.subtract(images, [0]), [255]) + + image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size) + return image_batch + + +def main(_): + # start_time = datetime.datetime.now() + label_file_name = os.path.join(args.label_dir, "labels.txt") + label_dict = get_class_label_dict(label_file_name) + classes_num = len(label_dict) + test_feeder = DataIterator(data_dir=args.dataset_path) + total_size = len(test_feeder.labels) + count = 0 + # get model from model registry + model_path = Model.get_model_path(args.model_name) + with tf.Session() as sess: + test_images = test_feeder.input_pipeline(batch_size=args.batch_size) + with slim.arg_scope(inception_v3.inception_v3_arg_scope()): + input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel]) + logits, _ = inception_v3.inception_v3(input_images, + num_classes=classes_num, + is_training=False) + probabilities = tf.argmax(logits, 1) + + sess.run(tf.global_variables_initializer()) + sess.run(tf.local_variables_initializer()) + coord = tf.train.Coordinator() + threads = tf.train.start_queue_runners(sess=sess, coord=coord) + saver = tf.train.Saver() + saver.restore(sess, model_path) + out_filename = os.path.join(args.output_dir, "result-labels.txt") + with open(out_filename, "w") as result_file: + i = 0 + while count < total_size and not coord.should_stop(): + test_images_batch = sess.run(test_images) + file_names_batch = test_feeder.file_paths[i * args.batch_size: + min(test_feeder.size, (i + 1) * args.batch_size)] + results = sess.run(probabilities, feed_dict={input_images: test_images_batch}) + new_add = min(args.batch_size, total_size - count) + count += new_add + i += 1 + for j in range(new_add): + result_file.write(os.path.basename(file_names_batch[j]) + ": " + label_dict[results[j]] + "\n") + result_file.flush() + coord.request_stop() + coord.join(threads) + + # copy the file to artifacts + shutil.copy(out_filename, "./outputs/") + # Move the processed data out of the blob so that the next run can process the data. + + +if __name__ == "__main__": + tf.app.run() diff --git a/pipeline/compare.py b/pipeline/compare.py new file mode 100644 index 000000000..21054c0e2 --- /dev/null +++ b/pipeline/compare.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. + +import argparse +import os + +print("In compare.py") +print("As a data scientist, this is where I use my compare code.") +parser = argparse.ArgumentParser("compare") +parser.add_argument("--compare_data1", type=str, help="compare_data1 data") +parser.add_argument("--compare_data2", type=str, help="compare_data2 data") +parser.add_argument("--output_compare", type=str, help="output_compare directory") + +args = parser.parse_args() + +print("Argument 1: %s" % args.compare_data1) +print("Argument 2: %s" % args.compare_data2) +print("Argument 3: %s" % args.output_compare) + +if not (args.output_compare is None): + os.makedirs(args.output_compare, exist_ok=True) + print("%s created" % args.output_compare) diff --git a/pipeline/extract.py b/pipeline/extract.py new file mode 100644 index 000000000..0134a0904 --- /dev/null +++ b/pipeline/extract.py @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. + +import argparse +import os + +print("In extract.py") +print("As a data scientist, this is where I use my extract code.") + +parser = argparse.ArgumentParser("extract") +parser.add_argument("--input_extract", type=str, help="input_extract data") +parser.add_argument("--output_extract", type=str, help="output_extract directory") + +args = parser.parse_args() + +print("Argument 1: %s" % args.input_extract) +print("Argument 2: %s" % args.output_extract) + +if not (args.output_extract is None): + os.makedirs(args.output_extract, exist_ok=True) + print("%s created" % args.output_extract) diff --git a/pipeline/pipeline-batch-scoring.ipynb b/pipeline/pipeline-batch-scoring.ipynb index 85eb9789e..db73c5133 100644 --- a/pipeline/pipeline-batch-scoring.ipynb +++ b/pipeline/pipeline-batch-scoring.ipynb @@ -4,8 +4,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Copyright (c) Microsoft Corporation. All rights reserved.\n", - "\n", + "Copyright (c) Microsoft Corporation. All rights reserved. \n", "Licensed under the MIT License." ] }, @@ -13,7 +12,15 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "This notebook demonstrates how to run batch scoring job. __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__ and unlabeled images from __[ImageNet](http://image-net.org/)__ dataset will be used. It registers a pretrained inception model in model registry then uses the model to do batch scoring on images in a blob container." + "# Using Azure Machine Learning Pipelines for batch prediction\n", + "\n", + "In this notebook we will demonstrate how to run a batch scoring job using Azure Machine Learning pipelines. Our example job will be to take an already-trained image classification model, and run that model on some unlabeled images. The image classification model that we'll use is the __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__ and we'll run this model on unlabeled images from the __[ImageNet](http://image-net.org/)__ dataset. \n", + "\n", + "The outline of this notebook is as follows:\n", + "\n", + "- Register the pretrained inception model into the model registry. \n", + "- Store the dataset images in a blob container.\n", + "- Use the registered model to do batch scoring on the images in the data blob container." ] }, { @@ -21,7 +28,24 @@ "metadata": {}, "source": [ "## Prerequisites\n", - "Make sure you go through the [00. Installation and Configuration](./00.configuration.ipynb) Notebook first if you haven't.\n" + "Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core import Datastore\n", + "from azureml.core import Experiment\n", + "from azureml.core.compute import AmlCompute, ComputeTarget\n", + "from azureml.core.conda_dependencies import CondaDependencies\n", + "from azureml.core.datastore import Datastore\n", + "from azureml.core.runconfig import CondaDependencies, RunConfiguration\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "from azureml.pipeline.steps import PythonScriptStep" ] }, { @@ -37,13 +61,28 @@ "print('Workspace name: ' + ws.name, \n", " 'Azure region: ' + ws.location, \n", " 'Subscription id: ' + ws.subscription_id, \n", - " 'Resource group: ' + ws.resource_group, sep = '\\n')\n", + " 'Resource group: ' + ws.resource_group, sep = '\\n')\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set up machine learning resources" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set up datastores\n", + "First, let’s access the datastore that has the model, labels, and images. \n", "\n", - "# Also create a Project and attach to Workspace\n", - "scripts_folder = \"scripts\"\n", + "### Create a datastore that points to a blob container containing sample images\n", "\n", - "if not os.path.isdir(scripts_folder):\n", - " os.mkdir(scripts_folder)" + "We have created a public blob container `sampledata` on an account named `pipelinedata`, containing images from the ImageNet evaluation set. In the next step, we create a datastore with the name `images_datastore`, which points to this container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. \n", + "\n", + "This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`." ] }, { @@ -52,20 +91,22 @@ "metadata": {}, "outputs": [], "source": [ - "from azureml.core.compute import AmlCompute, ComputeTarget\n", - "from azureml.core.datastore import Datastore\n", - "from azureml.data.data_reference import DataReference\n", - "from azureml.pipeline.core import Pipeline, PipelineData\n", - "from azureml.pipeline.steps import PythonScriptStep\n", - "from azureml.core.runconfig import CondaDependencies, RunConfiguration" + "account_name = \"pipelinedata\"\n", + "datastore_name=\"images_datastore\"\n", + "container_name=\"sampledata\"\n", + "\n", + "batchscore_blob = Datastore.register_azure_blob_container(ws, \n", + " datastore_name=datastore_name, \n", + " container_name= container_name, \n", + " account_name=account_name, \n", + " overwrite=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Create and attach Compute targets\n", - "Use the below code to create and attach Compute targets. " + "Next, let’s specify the default datastore for the outputs." ] }, { @@ -74,49 +115,49 @@ "metadata": {}, "outputs": [], "source": [ - "import os\n", - "\n", - "# choose a name for your cluster\n", - "compute_name = os.environ.get(\"BATCHAI_CLUSTER_NAME\", \"gpucluster\")\n", - "compute_min_nodes = os.environ.get(\"BATCHAI_CLUSTER_MIN_NODES\", 0)\n", - "compute_max_nodes = os.environ.get(\"BATCHAI_CLUSTER_MAX_NODES\", 4)\n", - "vm_size = os.environ.get(\"BATCHAI_CLUSTER_SKU\", \"STANDARD_NC6\")\n", - "\n", - "\n", - "if compute_name in ws.compute_targets:\n", - " compute_target = ws.compute_targets[compute_name]\n", - " if compute_target and type(compute_target) is AmlCompute:\n", - " print('found compute target. just use it. ' + compute_name)\n", - "else:\n", - " print('creating a new compute target...')\n", - " provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size, # NC6 is GPU-enabled\n", - " vm_priority = 'lowpriority', # optional\n", - " min_nodes = compute_min_nodes, \n", - " max_nodes = compute_max_nodes)\n", - "\n", - " # create the cluster\n", - " compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)\n", - " \n", - " # can poll for a minimum number of nodes and for a specific timeout. \n", - " # if no min node count is provided it will use the scale settings for the cluster\n", - " compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", - " \n", - " # For a more detailed view of current BatchAI cluster status, use the 'status' property \n", - " print(compute_target.status.serialize())" + "def_data_store = ws.get_default_datastore()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Python scripts to run" + "### Configure data references\n", + "Now you need to add references to the data, as inputs to the appropriate pipeline steps in your pipeline. A data source in a pipeline is represented by a DataReference object. The DataReference object points to data that lives in, or is accessible from, a datastore. We need DataReference objects corresponding to the following: the directory containing the input images, the directory in which the pretrained model is stored, the directory containing the labels, and the output directory." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "input_images = DataReference(datastore=batchscore_blob, \n", + " data_reference_name=\"input_images\",\n", + " path_on_datastore=\"batchscoring/images\",\n", + " mode=\"download\"\n", + " )\n", + "model_dir = DataReference(datastore=batchscore_blob, \n", + " data_reference_name=\"input_model\",\n", + " path_on_datastore=\"batchscoring/models\",\n", + " mode=\"download\" \n", + " )\n", + "label_dir = DataReference(datastore=batchscore_blob, \n", + " data_reference_name=\"input_labels\",\n", + " path_on_datastore=\"batchscoring/labels\",\n", + " mode=\"download\" \n", + " )\n", + "output_dir = PipelineData(name=\"scores\", \n", + " datastore=def_data_store, \n", + " output_path_on_compute=\"batchscoring/results\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Python scripts that run the batch scoring. `batchai_score.py` takes input images in `dataset_path`, pretrained models in `model_dir` and outputs a `results-label.txt` to `output_dir`." + "### Create and attach Compute targets\n", + "Use the below code to create and attach Compute targets. " ] }, { @@ -125,134 +166,51 @@ "metadata": {}, "outputs": [], "source": [ - "%%writefile $scripts_folder/batchai_score.py\n", "import os\n", - "import argparse\n", - "import datetime,time\n", - "import tensorflow as tf\n", - "from math import ceil\n", - "import numpy as np\n", - "import shutil\n", - "from tensorflow.contrib.slim.python.slim.nets import inception_v3\n", - "from azureml.core.model import Model\n", - "\n", - "slim = tf.contrib.slim\n", - "\n", - "parser = argparse.ArgumentParser(description=\"Start a tensorflow model serving\")\n", - "parser.add_argument('--model_name', dest=\"model_name\", required=True)\n", - "parser.add_argument('--label_dir', dest=\"label_dir\", required=True)\n", - "parser.add_argument('--dataset_path', dest=\"dataset_path\", required=True)\n", - "parser.add_argument('--output_dir', dest=\"output_dir\", required=True)\n", - "parser.add_argument('--batch_size', dest=\"batch_size\", type=int, required=True)\n", - "\n", - "args = parser.parse_args()\n", - "\n", - "image_size = 299\n", - "num_channel = 3\n", - "\n", - "# create output directory if it does not exist\n", - "os.makedirs(args.output_dir, exist_ok=True)\n", - "\n", - "def get_class_label_dict(label_file):\n", - " label = []\n", - " proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()\n", - " for l in proto_as_ascii_lines:\n", - " label.append(l.rstrip())\n", - " return label\n", - "\n", - "\n", - "class DataIterator:\n", - " def __init__(self, data_dir):\n", - " self.file_paths = []\n", - " image_list = os.listdir(data_dir)\n", - " total_size = len(image_list)\n", - " self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list ]\n", - "\n", - " self.labels = [1 for file_name in self.file_paths]\n", - "\n", - " @property\n", - " def size(self):\n", - " return len(self.labels)\n", "\n", - " def input_pipeline(self, batch_size):\n", - " images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)\n", - " labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)\n", - " input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)\n", - " labels = input_queue[1]\n", - " images_content = tf.read_file(input_queue[0])\n", - "\n", - " image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name=\"jpeg_reader\")\n", - " float_caster = tf.cast(image_reader, tf.float32)\n", - " new_size = tf.constant([image_size, image_size], dtype=tf.int32)\n", - " images = tf.image.resize_images(float_caster, new_size)\n", - " images = tf.divide(tf.subtract(images, [0]), [255])\n", - "\n", - " image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)\n", - " return image_batch\n", + "# choose a name for your cluster\n", + "aml_compute_name = os.environ.get(\"AML_COMPUTE_NAME\", \"gpu-cluster\")\n", + "cluster_min_nodes = os.environ.get(\"AML_COMPUTE_MIN_NODES\", 0)\n", + "cluster_max_nodes = os.environ.get(\"AML_COMPUTE_MAX_NODES\", 1)\n", + "vm_size = os.environ.get(\"AML_COMPUTE_SKU\", \"STANDARD_NC6\")\n", "\n", - "def main(_):\n", - " start_time = datetime.datetime.now()\n", - " label_file_name = os.path.join(args.label_dir, \"labels.txt\")\n", - " label_dict = get_class_label_dict(label_file_name)\n", - " classes_num = len(label_dict)\n", - " test_feeder = DataIterator(data_dir=args.dataset_path)\n", - " total_size = len(test_feeder.labels)\n", - " count = 0\n", - " # get model from model registry\n", - " model_path = Model.get_model_path(args.model_name)\n", - " with tf.Session() as sess:\n", - " test_images = test_feeder.input_pipeline(batch_size=args.batch_size)\n", - " with slim.arg_scope(inception_v3.inception_v3_arg_scope()):\n", - " input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])\n", - " logits, _ = inception_v3.inception_v3(input_images,\n", - " num_classes=classes_num,\n", - " is_training=False)\n", - " probabilities = tf.argmax(logits, 1)\n", "\n", - " sess.run(tf.global_variables_initializer())\n", - " sess.run(tf.local_variables_initializer())\n", - " coord = tf.train.Coordinator()\n", - " threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n", - " saver = tf.train.Saver()\n", - " saver.restore(sess, model_path)\n", - " out_filename = os.path.join(args.output_dir, \"result-labels.txt\")\n", - " with open(out_filename, \"w\") as result_file:\n", - " i = 0\n", - " while count < total_size and not coord.should_stop():\n", - " test_images_batch = sess.run(test_images)\n", - " file_names_batch = test_feeder.file_paths[i*args.batch_size: min(test_feeder.size, (i+1)*args.batch_size)]\n", - " results = sess.run(probabilities, feed_dict={input_images: test_images_batch})\n", - " new_add = min(args.batch_size, total_size-count)\n", - " count += new_add\n", - " i += 1\n", - " for j in range(new_add):\n", - " result_file.write(os.path.basename(file_names_batch[j]) + \": \" + label_dict[results[j]] + \"\\n\")\n", - " result_file.flush()\n", - " coord.request_stop()\n", - " coord.join(threads)\n", - " \n", - " # copy the file to artifacts\n", - " shutil.copy(out_filename, \"./outputs/\")\n", - " # Move the processed data out of the blob so that the next run can process the data.\n", + "if aml_compute_name in ws.compute_targets:\n", + " compute_target = ws.compute_targets[aml_compute_name]\n", + " if compute_target and type(compute_target) is AmlCompute:\n", + " print('found compute target. just use it. ' + aml_compute_name)\n", + "else:\n", + " print('creating a new compute target...')\n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size, # NC6 is GPU-enabled\n", + " vm_priority = 'lowpriority', # optional\n", + " min_nodes = cluster_min_nodes, \n", + " max_nodes = cluster_max_nodes)\n", "\n", - "if __name__ == \"__main__\":\n", - " tf.app.run()" + " # create the cluster\n", + " compute_target = ComputeTarget.create(ws, aml_compute_name, provisioning_config)\n", + " \n", + " # can poll for a minimum number of nodes and for a specific timeout. \n", + " # if no min node count is provided it will use the scale settings for the cluster\n", + " compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", + " \n", + " # For a more detailed view of current Azure Machine Learning Compute status, use the 'status' property \n", + " print(compute_target.status.serialize())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Prepare Model and Input data" + "## Prepare the Model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Download Model\n", + "### Download the Model\n", "\n", - "Download and extract model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to `\"models\"`" + "Download and extract the model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to `\"models\"`" ] }, { @@ -286,11 +244,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Create a datastore that points to blob container containing sample images\n", - "\n", - "We have created a public blob container `sampledata` on an account named `pipelinedata` containing images from ImageNet evaluation set. In the next step, we create a datastore with name `images_datastore` that points to this container. The `overwrite=True` step overwrites any datastore that was created previously with that name. \n", - "\n", - "This step can be changed to point to your blob container by providing an additional `account_key` parameter with `account_name`. " + "### Register the model with Workspace" ] }, { @@ -299,112 +253,57 @@ "metadata": {}, "outputs": [], "source": [ - "account_name = \"pipelinedata\"\n", - "sample_data = Datastore.register_azure_blob_container(ws, datastore_name=\"images_datastore\", container_name=\"sampledata\", \n", - " account_name=account_name, \n", - " overwrite=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Output datastore" + "import shutil\n", + "from azureml.core.model import Model\n", + "\n", + "# register downloaded model \n", + "model = Model.register(model_path = \"models/inception_v3.ckpt\",\n", + " model_name = \"inception\", # this is the name the model is registered as\n", + " tags = {'pretrained': \"inception\"},\n", + " description = \"Imagenet trained tensorflow inception\",\n", + " workspace = ws)\n", + "# remove the downloaded dir after registration if you wish\n", + "shutil.rmtree(\"models\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We write the outputs to the default datastore" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "default_ds = ws.get_default_datastore()" + "## Write your scoring script" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Specify where the data is stored or will be written to" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.conda_dependencies import CondaDependencies\n", - "from azureml.data.data_reference import DataReference\n", - "from azureml.pipeline.core import Pipeline, PipelineData\n", - "from azureml.core import Datastore\n", - "from azureml.core import Experiment" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "input_images = DataReference(datastore=sample_data, \n", - " data_reference_name=\"input_images\",\n", - " path_on_datastore=\"batchscoring/images\",\n", - " mode=\"download\"\n", - " )\n", - "model_dir = DataReference(datastore=sample_data, \n", - " data_reference_name=\"input_model\",\n", - " path_on_datastore=\"batchscoring/models\",\n", - " mode=\"download\" \n", - " )\n", - "label_dir = DataReference(datastore=sample_data, \n", - " data_reference_name=\"input_labels\",\n", - " path_on_datastore=\"batchscoring/labels\",\n", - " mode=\"download\" \n", - " )\n", - "output_dir = PipelineData(name=\"scores\", \n", - " datastore=default_ds, \n", - " output_path_on_compute=\"batchscoring/results\")" + "To do the scoring, we use a batch scoring script `batch_scoring.py`, which is located in the same directory that this notebook is in. You can take a look at this script to see how you might modify it for your custom batch scoring task.\n", + "\n", + "The python script `batch_scoring.py` takes input images, applies the image classification model to these images, and outputs a classification result to a results file.\n", + "\n", + "The script `batch_scoring.py` takes the following parameters:\n", + "\n", + "- `--model_name`: the name of the model being used, which is expected to be in the `model_dir` directory\n", + "- `--label_dir` : the directory holding the `labels.txt` file \n", + "- `--dataset_path`: the directory containing the input images\n", + "- `--output_dir` : the script will run the model on the data and output a `results-label.txt` to this directory\n", + "- `--batch_size` : the batch size used in running the model.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Register the model with Workspace" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import shutil\n", - "from azureml.core.model import Model\n", - "\n", - "# register downloaded model \n", - "model = Model.register(model_path = \"models/inception_v3.ckpt\",\n", - " model_name = \"inception\", # this is the name the model is registered as\n", - " tags = {'pretrained': \"inception\"},\n", - " description = \"Imagenet trained tensorflow inception\",\n", - " workspace = ws)\n", - "# remove the downloaded dir after registration if you wish\n", - "shutil.rmtree(\"models\")" + "## Build and run the batch scoring pipeline\n", + "You have everything you need to build the pipeline. Let’s put all these together." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Specify environment to run the script" + "### Specify the environment to run the script\n", + "Specify the conda dependencies for your script. You will need this object when you create the pipeline step later on." ] }, { @@ -418,24 +317,18 @@ "cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.10.0\", \"azureml-defaults\"])\n", "\n", "# Runconfig\n", - "batchai_run_config = RunConfiguration(conda_dependencies=cd)\n", - "batchai_run_config.environment.docker.enabled = True\n", - "batchai_run_config.environment.docker.gpu_support = True\n", - "batchai_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE\n", - "batchai_run_config.environment.spark.precache_packages = False" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Steps to run" + "amlcompute_run_config = RunConfiguration(conda_dependencies=cd)\n", + "amlcompute_run_config.environment.docker.enabled = True\n", + "amlcompute_run_config.environment.docker.gpu_support = True\n", + "amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE\n", + "amlcompute_run_config.environment.spark.precache_packages = False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ + "### Specify the parameters for your pipeline\n", "A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter." ] }, @@ -449,6 +342,14 @@ "batch_size_param = PipelineParameter(name=\"param_batch_size\", default_value=20)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create the pipeline step\n", + "Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use PythonScriptStep to create the pipeline step." + ] + }, { "cell_type": "code", "execution_count": null, @@ -458,8 +359,8 @@ "inception_model_name = \"inception_v3.ckpt\"\n", "\n", "batch_score_step = PythonScriptStep(\n", - " name=\"batch ai scoring\",\n", - " script_name=\"batchai_score.py\",\n", + " name=\"batch_scoring\",\n", + " script_name=\"batch_scoring.py\",\n", " arguments=[\"--dataset_path\", input_images, \n", " \"--model_name\", \"inception\",\n", " \"--label_dir\", label_dir, \n", @@ -468,11 +369,18 @@ " compute_target=compute_target,\n", " inputs=[input_images, label_dir],\n", " outputs=[output_dir],\n", - " runconfig=batchai_run_config,\n", - " source_directory=scripts_folder\n", + " runconfig=amlcompute_run_config\n", ")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Run the pipeline\n", + "At this point you can run the pipeline and examine the output it produced. " + ] + }, { "cell_type": "code", "execution_count": null, @@ -487,7 +395,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Monitor run" + "### Monitor the run" ] }, { @@ -513,7 +421,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Download and review output" + "### Download and review output" ] }, { @@ -542,14 +450,15 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Publish a pipeline and rerun using a REST call" + "## Publish a pipeline and rerun using a REST call" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Create a published pipeline" + "### Create a published pipeline\n", + "Once you are satisfied with the outcome of the run, you can publish the pipeline to run it with different input values later. When you publish a pipeline, you will get a REST endpoint that accepts invoking of the pipeline with the set of parameters you have already incorporated above using PipelineParameter." ] }, { @@ -559,7 +468,7 @@ "outputs": [], "source": [ "published_pipeline = pipeline_run.publish_pipeline(\n", - " name=\"Inception v3 scoring\", description=\"Batch scoring using Inception v3 model\", version=\"1.0\")\n", + " name=\"Inception_v3_scoring\", description=\"Batch scoring using Inception v3 model\", version=\"1.0\")\n", "\n", "published_id = published_pipeline.id" ] @@ -568,14 +477,14 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Rerun using REST call" + "## Rerun the pipeline using the REST endpoint" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Get AAD token" + "### Get AAD token" ] }, { @@ -595,7 +504,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Run published pipeline using its REST endpoint" + "### Run published pipeline" ] }, { @@ -619,7 +528,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Monitor the new run" + "### Monitor the new run" ] }, { @@ -642,9 +551,9 @@ } ], "kernelspec": { - "display_name": "Python 3.6", + "display_name": "Python 3", "language": "python", - "name": "python36" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -656,7 +565,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.2" + "version": "3.6.7" } }, "nbformat": 4, diff --git a/pipeline/pipeline-style-transfer.ipynb b/pipeline/pipeline-style-transfer.ipynb new file mode 100644 index 000000000..b1b6674c1 --- /dev/null +++ b/pipeline/pipeline-style-transfer.ipynb @@ -0,0 +1,610 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Copyright (c) Microsoft Corporation. All rights reserved.\n", + "\n", + "Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Neural style transfer on video\n", + "Using modified code from `pytorch`'s neural style [example](https://pytorch.org/tutorials/advanced/neural_style_tutorial.html), we show how to setup a pipeline for doing style transfer on video. The pipeline has following steps:\n", + "1. Split a video into images\n", + "2. Run neural style on each image using one of the provided models (from `pytorch` pretrained models for this example).\n", + "3. Stitch the image back into a video." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialize Workspace\n", + "\n", + "Initialize a workspace object from persisted configuration." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from azureml.core import Workspace, Run, Experiment\n", + "\n", + "ws = Workspace.from_config()\n", + "print('Workspace name: ' + ws.name, \n", + " 'Azure region: ' + ws.location, \n", + " 'Subscription id: ' + ws.subscription_id, \n", + " 'Resource group: ' + ws.resource_group, sep = '\\n')\n", + "\n", + "scripts_folder = \"scripts_folder\"\n", + "\n", + "if not os.path.isdir(scripts_folder):\n", + " os.mkdir(scripts_folder)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.compute import AmlCompute, ComputeTarget\n", + "from azureml.core.datastore import Datastore\n", + "from azureml.data.data_reference import DataReference\n", + "from azureml.pipeline.core import Pipeline, PipelineData\n", + "from azureml.pipeline.steps import PythonScriptStep, MpiStep\n", + "from azureml.core.runconfig import CondaDependencies, RunConfiguration" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create or use existing compute" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# AmlCompute\n", + "cpu_cluster_name = \"cpucluster\"\n", + "try:\n", + " cpu_cluster = AmlCompute(ws, cpu_cluster_name)\n", + " print(\"found existing cluster.\")\n", + "except:\n", + " print(\"creating new cluster\")\n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_v2\",\n", + " max_nodes = 1)\n", + "\n", + " # create the cluster\n", + " cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)\n", + " cpu_cluster.wait_for_completion(show_output=True)\n", + " \n", + "# AmlCompute\n", + "gpu_cluster_name = \"gpucluster\"\n", + "try:\n", + " gpu_cluster = AmlCompute(ws, gpu_cluster_name)\n", + " print(\"found existing cluster.\")\n", + "except:\n", + " print(\"creating new cluster\")\n", + " provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_NC6\",\n", + " max_nodes = 3)\n", + "\n", + " # create the cluster\n", + " gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)\n", + " gpu_cluster.wait_for_completion(show_output=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Python Scripts\n", + "We use an edited version of `neural_style_mpi.py` (original is [here](https://github.com/pytorch/examples/blob/master/fast_neural_style/neural_style/neural_style_mpi.py)). Scripts to split and stitch the video are thin wrappers to calls to `ffmpeg`. \n", + "\n", + "We install `ffmpeg` through conda dependencies." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import shutil\n", + "shutil.copy(\"neural_style_mpi.py\", scripts_folder)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile $scripts_folder/process_video.py\n", + "import argparse\n", + "import glob\n", + "import os\n", + "import subprocess\n", + "\n", + "parser = argparse.ArgumentParser(description=\"Process input video\")\n", + "parser.add_argument('--input_video', required=True)\n", + "parser.add_argument('--output_audio', required=True)\n", + "parser.add_argument('--output_images', required=True)\n", + "\n", + "args = parser.parse_args()\n", + "\n", + "os.makedirs(args.output_audio, exist_ok=True)\n", + "os.makedirs(args.output_images, exist_ok=True)\n", + "\n", + "subprocess.run(\"ffmpeg -i {} {}/video.aac\"\n", + " .format(args.input_video, args.output_audio),\n", + " shell=True, check=True\n", + " )\n", + "\n", + "subprocess.run(\"ffmpeg -i {} {}/%05d_video.jpg -hide_banner\"\n", + " .format(args.input_video, args.output_images),\n", + " shell=True, check=True\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile $scripts_folder/stitch_video.py\n", + "import argparse\n", + "import os\n", + "import subprocess\n", + "\n", + "parser = argparse.ArgumentParser(description=\"Process input video\")\n", + "parser.add_argument('--images_dir', required=True)\n", + "parser.add_argument('--input_audio', required=True)\n", + "parser.add_argument('--output_dir', required=True)\n", + "\n", + "args = parser.parse_args()\n", + "\n", + "os.makedirs(args.output_dir, exist_ok=True)\n", + "\n", + "subprocess.run(\"ffmpeg -framerate 30 -i {}/%05d_video.jpg -c:v libx264 -profile:v high -crf 20 -pix_fmt yuv420p \"\n", + " \"-y {}/video_without_audio.mp4\"\n", + " .format(args.images_dir, args.output_dir),\n", + " shell=True, check=True\n", + " )\n", + "\n", + "subprocess.run(\"ffmpeg -i {}/video_without_audio.mp4 -i {}/video.aac -map 0:0 -map 1:0 -vcodec \"\n", + " \"copy -acodec copy -y {}/video_with_audio.mp4\"\n", + " .format(args.output_dir, args.input_audio, args.output_dir),\n", + " shell=True, check=True\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# datastore for input video\n", + "account_name = \"happypathspublic\"\n", + "video_ds = Datastore.register_azure_blob_container(ws, \"videos\", \"videos\",\n", + " account_name=account_name, overwrite=True)\n", + "\n", + "# datastore for models\n", + "models_ds = Datastore.register_azure_blob_container(ws, \"models\", \"styletransfer\", \n", + " account_name=\"pipelinedata\", \n", + " overwrite=True)\n", + " \n", + "# downloaded models from https://pytorch.org/tutorials/advanced/neural_style_tutorial.html are kept here\n", + "models_dir = DataReference(data_reference_name=\"models\", datastore=models_ds, \n", + " path_on_datastore=\"saved_models\", mode=\"download\")\n", + "\n", + "# the default blob store attached to a workspace\n", + "default_datastore = ws.get_default_datastore()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Sample video" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "orangutan_video = DataReference(datastore=video_ds,\n", + " data_reference_name=\"video\",\n", + " path_on_datastore=\"orangutan.mp4\", mode=\"download\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cd = CondaDependencies()\n", + "\n", + "cd.add_channel(\"conda-forge\")\n", + "cd.add_conda_package(\"ffmpeg\")\n", + "\n", + "cd.add_channel(\"pytorch\")\n", + "cd.add_conda_package(\"pytorch\")\n", + "cd.add_conda_package(\"torchvision\")\n", + "\n", + "# Runconfig\n", + "amlcompute_run_config = RunConfiguration(conda_dependencies=cd)\n", + "amlcompute_run_config.environment.docker.enabled = True\n", + "amlcompute_run_config.environment.docker.gpu_support = True\n", + "amlcompute_run_config.environment.docker.base_image = \"pytorch/pytorch\"\n", + "amlcompute_run_config.environment.spark.precache_packages = False" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ffmpeg_audio = PipelineData(name=\"ffmpeg_audio\", datastore=default_datastore)\n", + "ffmpeg_images = PipelineData(name=\"ffmpeg_images\", datastore=default_datastore)\n", + "processed_images = PipelineData(name=\"processed_images\", datastore=default_datastore)\n", + "output_video = PipelineData(name=\"output_video\", datastore=default_datastore)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Define tweakable parameters to pipeline\n", + "These parameters can be changed when the pipeline is published and rerun from a REST call" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.pipeline.core.graph import PipelineParameter\n", + "# create a parameter for style (one of \"candy\", \"mosaic\", \"rain_princess\", \"udnie\") to transfer the images to\n", + "style_param = PipelineParameter(name=\"style\", default_value=\"mosaic\")\n", + "# create a parameter for the number of nodes to use in step no. 2 (style transfer)\n", + "nodecount_param = PipelineParameter(name=\"nodecount\", default_value=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "split_video_step = PythonScriptStep(\n", + " name=\"split video\",\n", + " script_name=\"process_video.py\",\n", + " arguments=[\"--input_video\", orangutan_video,\n", + " \"--output_audio\", ffmpeg_audio,\n", + " \"--output_images\", ffmpeg_images,\n", + " ],\n", + " compute_target=cpu_cluster,\n", + " inputs=[orangutan_video],\n", + " outputs=[ffmpeg_images, ffmpeg_audio],\n", + " runconfig=amlcompute_run_config,\n", + " source_directory=scripts_folder\n", + ")\n", + "\n", + "# create a MPI step for distributing style transfer step across multiple nodes in AmlCompute \n", + "# using 'nodecount_param' PipelineParameter\n", + "distributed_style_transfer_step = MpiStep(\n", + " name=\"mpi style transfer\",\n", + " script_name=\"neural_style_mpi.py\",\n", + " arguments=[\"--content-dir\", ffmpeg_images,\n", + " \"--output-dir\", processed_images,\n", + " \"--model-dir\", models_dir,\n", + " \"--style\", style_param,\n", + " \"--cuda\", 1\n", + " ],\n", + " compute_target=gpu_cluster,\n", + " node_count=nodecount_param, \n", + " process_count_per_node=1,\n", + " inputs=[models_dir, ffmpeg_images],\n", + " outputs=[processed_images],\n", + " pip_packages=[\"mpi4py\", \"torch\", \"torchvision\"],\n", + " runconfig=amlcompute_run_config,\n", + " use_gpu=True,\n", + " source_directory=scripts_folder\n", + ")\n", + "\n", + "stitch_video_step = PythonScriptStep(\n", + " name=\"stitch\",\n", + " script_name=\"stitch_video.py\",\n", + " arguments=[\"--images_dir\", processed_images, \n", + " \"--input_audio\", ffmpeg_audio, \n", + " \"--output_dir\", output_video],\n", + " compute_target=cpu_cluster,\n", + " inputs=[processed_images, ffmpeg_audio],\n", + " outputs=[output_video],\n", + " runconfig=amlcompute_run_config,\n", + " source_directory=scripts_folder\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Run the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline = Pipeline(workspace=ws, steps=[stitch_video_step])\n", + "# submit the pipeline and provide values for the PipelineParameters used in the pipeline\n", + "pipeline_run = Experiment(ws, 'style_transfer').submit(pipeline, pipeline_params={\"style\": \"mosaic\", \"nodecount\": 3})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Monitor using widget" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.widgets import RunDetails\n", + "RunDetails(pipeline_run).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Downloads the video in `output_video` folder" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Download output video" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def download_video(run, target_dir=None):\n", + " stitch_run = run.find_step_run(\"stitch\")[0]\n", + " port_data = stitch_run.get_output_data(\"output_video\")\n", + " port_data.download(target_dir, show_progress=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_run.wait_for_completion()\n", + "download_video(pipeline_run, \"output_video_mosaic\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Publish pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "published_pipeline = pipeline_run.publish_pipeline(\n", + " name=\"batch score style transfer\", description=\"style transfer\", version=\"1.0\")\n", + "\n", + "published_id = published_pipeline.id" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Re-run pipeline through REST calls for other styles" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Get AAD token" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core.authentication import AzureCliAuthentication\n", + "import requests\n", + "\n", + "cli_auth = AzureCliAuthentication()\n", + "aad_token = cli_auth.get_authentication_header()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Get endpoint URL" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "rest_endpoint = published_pipeline.endpoint" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Send request and monitor" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# run the pipeline using PipelineParameter values style='candy' and nodecount=2\n", + "response = requests.post(rest_endpoint, \n", + " headers=aad_token,\n", + " json={\"ExperimentName\": \"style_transfer\",\n", + " \"ParameterAssignments\": {\"style\": \"candy\", \"nodecount\": 2}}) \n", + "run_id = response.json()[\"Id\"]\n", + "\n", + "from azureml.pipeline.core.run import PipelineRun\n", + "published_pipeline_run_candy = PipelineRun(ws.experiments[\"style_transfer\"], run_id)\n", + "\n", + "RunDetails(published_pipeline_run_candy).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# run the pipeline using PipelineParameter values style='rain_princess' and nodecount=3\n", + "response = requests.post(rest_endpoint, \n", + " headers=aad_token,\n", + " json={\"ExperimentName\": \"style_transfer\",\n", + " \"ParameterAssignments\": {\"style\": \"rain_princess\", \"nodecount\": 3}}) \n", + "run_id = response.json()[\"Id\"]\n", + "\n", + "from azureml.pipeline.core.run import PipelineRun\n", + "published_pipeline_run_rain = PipelineRun(ws.experiments[\"style_transfer\"], run_id)\n", + "\n", + "RunDetails(published_pipeline_run_rain).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# run the pipeline using PipelineParameter values style='udnie' and nodecount=4\n", + "response = requests.post(rest_endpoint, \n", + " headers=aad_token,\n", + " json={\"ExperimentName\": \"style_transfer\",\n", + " \"ParameterAssignments\": {\"style\": \"udnie\", \"nodecount\": 4}}) \n", + "run_id = response.json()[\"Id\"]\n", + "\n", + "from azureml.pipeline.core.run import PipelineRun\n", + "published_pipeline_run_udnie = PipelineRun(ws.experiments[\"style_transfer\"], run_id)\n", + "\n", + "RunDetails(published_pipeline_run_udnie).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Download output from re-run" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "published_pipeline_run_candy.wait_for_completion()\n", + "published_pipeline_run_rain.wait_for_completion()\n", + "published_pipeline_run_udnie.wait_for_completion()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "download_video(published_pipeline_run_candy, target_dir=\"output_video_candy\")\n", + "download_video(published_pipeline_run_rain, target_dir=\"output_video_rain_princess\")\n", + "download_video(published_pipeline_run_udnie, target_dir=\"output_video_udnie\")" + ] + } + ], + "metadata": { + "authors": [ + { + "name": "hichando" + } + ], + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipeline/test_adla_script.usql b/pipeline/test_adla_script.usql new file mode 100644 index 000000000..9923ecbaa --- /dev/null +++ b/pipeline/test_adla_script.usql @@ -0,0 +1,12 @@ +CREATE DATABASE IF NOT EXISTS oneboxtest01; + + +@resourcereader = + EXTRACT query string + FROM "@@script_input@@" +USING Extractors.Csv(); + + +OUTPUT @resourcereader +TO "@@script_output@@" +USING Outputters.Csv(); \ No newline at end of file diff --git a/pipeline/testdata.txt b/pipeline/testdata.txt new file mode 100644 index 000000000..e88ded96a --- /dev/null +++ b/pipeline/testdata.txt @@ -0,0 +1 @@ +Test1 \ No newline at end of file diff --git a/pipeline/train-db-dbfs.py b/pipeline/train-db-dbfs.py new file mode 100644 index 000000000..99b511afd --- /dev/null +++ b/pipeline/train-db-dbfs.py @@ -0,0 +1,5 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. + +print("In train.py") +print("As a data scientist, this is where I use my training code.") diff --git a/pipeline/train-db-local.py b/pipeline/train-db-local.py new file mode 100644 index 000000000..99b511afd --- /dev/null +++ b/pipeline/train-db-local.py @@ -0,0 +1,5 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. + +print("In train.py") +print("As a data scientist, this is where I use my training code.") diff --git a/pipeline/train.py b/pipeline/train.py new file mode 100644 index 000000000..961f5ebfd --- /dev/null +++ b/pipeline/train.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. + +import argparse +import os + +print("In train.py") +print("As a data scientist, this is where I use my training code.") + +parser = argparse.ArgumentParser("train") + +parser.add_argument("--input_data", type=str, help="input data") +parser.add_argument("--output_train", type=str, help="output_train directory") + +args = parser.parse_args() + +print("Argument 1: %s" % args.input_data) +print("Argument 2: %s" % args.output_train) + +if not (args.output_train is None): + os.makedirs(args.output_train, exist_ok=True) + print("%s created" % args.output_train)