diff --git a/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/batch_scoring.py b/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/batch_scoring.py deleted file mode 100644 index dfd135bd6..000000000 --- a/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/batch_scoring.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. -# Licensed under the MIT license. - -import os -import argparse -import datetime -import time -import tensorflow as tf -from math import ceil -import numpy as np -import shutil -from tensorflow.contrib.slim.python.slim.nets import inception_v3 -from azureml.core.model import Model - -slim = tf.contrib.slim - -parser = argparse.ArgumentParser(description="Start a tensorflow model serving") -parser.add_argument('--model_name', dest="model_name", required=True) -parser.add_argument('--label_dir', dest="label_dir", required=True) -parser.add_argument('--dataset_path', dest="dataset_path", required=True) -parser.add_argument('--output_dir', dest="output_dir", required=True) -parser.add_argument('--batch_size', dest="batch_size", type=int, required=True) - -args = parser.parse_args() - -image_size = 299 -num_channel = 3 - -# create output directory if it does not exist -os.makedirs(args.output_dir, exist_ok=True) - - -def get_class_label_dict(label_file): - label = [] - proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines() - for l in proto_as_ascii_lines: - label.append(l.rstrip()) - return label - - -class DataIterator: - def __init__(self, data_dir): - self.file_paths = [] - image_list = os.listdir(data_dir) - # total_size = len(image_list) - self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list] - - self.labels = [1 for file_name in self.file_paths] - - @property - def size(self): - return len(self.labels) - - def input_pipeline(self, batch_size): - images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string) - labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64) - input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False) - labels = input_queue[1] - images_content = tf.read_file(input_queue[0]) - - image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name="jpeg_reader") - float_caster = tf.cast(image_reader, tf.float32) - new_size = tf.constant([image_size, image_size], dtype=tf.int32) - images = tf.image.resize_images(float_caster, new_size) - images = tf.divide(tf.subtract(images, [0]), [255]) - - image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size) - return image_batch - - -def main(_): - # start_time = datetime.datetime.now() - label_file_name = os.path.join(args.label_dir, "labels.txt") - label_dict = get_class_label_dict(label_file_name) - classes_num = len(label_dict) - test_feeder = DataIterator(data_dir=args.dataset_path) - total_size = len(test_feeder.labels) - count = 0 - # get model from model registry - model_path = Model.get_model_path(args.model_name) - with tf.Session() as sess: - test_images = test_feeder.input_pipeline(batch_size=args.batch_size) - with slim.arg_scope(inception_v3.inception_v3_arg_scope()): - input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel]) - logits, _ = inception_v3.inception_v3(input_images, - num_classes=classes_num, - is_training=False) - probabilities = tf.argmax(logits, 1) - - sess.run(tf.global_variables_initializer()) - sess.run(tf.local_variables_initializer()) - coord = tf.train.Coordinator() - threads = tf.train.start_queue_runners(sess=sess, coord=coord) - saver = tf.train.Saver() - saver.restore(sess, model_path) - out_filename = os.path.join(args.output_dir, "result-labels.txt") - with open(out_filename, "w") as result_file: - i = 0 - while count < total_size and not coord.should_stop(): - test_images_batch = sess.run(test_images) - file_names_batch = test_feeder.file_paths[i * args.batch_size: - min(test_feeder.size, (i + 1) * args.batch_size)] - results = sess.run(probabilities, feed_dict={input_images: test_images_batch}) - new_add = min(args.batch_size, total_size - count) - count += new_add - i += 1 - for j in range(new_add): - result_file.write(os.path.basename(file_names_batch[j]) + ": " + label_dict[results[j]] + "\n") - result_file.flush() - coord.request_stop() - coord.join(threads) - - # copy the file to artifacts - shutil.copy(out_filename, "./outputs/") - # Move the processed data out of the blob so that the next run can process the data. - - -if __name__ == "__main__": - tf.app.run() diff --git a/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.ipynb b/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.ipynb deleted file mode 100644 index 86866d76a..000000000 --- a/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.ipynb +++ /dev/null @@ -1,630 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Copyright (c) Microsoft Corporation. All rights reserved. \n", - "Licensed under the MIT License." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.png)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "**Note**: Azure Machine Learning recently released ParallelRunStep for public preview, this will allow for parallelization of your workload across many compute nodes without the difficulty of orchestrating worker pools and queues. See the [batch inference notebooks](../../../contrib/batch_inferencing/) for examples on how to get started." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Using Azure Machine Learning Pipelines for batch prediction\n", - "\n", - "In this notebook we will demonstrate how to run a batch scoring job using Azure Machine Learning pipelines. Our example job will be to take an already-trained image classification model, and run that model on some unlabeled images. The image classification model that we'll use is the __[Inception-V3 model](https://arxiv.org/abs/1512.00567)__ and we'll run this model on unlabeled images from the __[ImageNet](http://image-net.org/)__ dataset. \n", - "\n", - "The outline of this notebook is as follows:\n", - "\n", - "- Register the pretrained inception model into the model registry. \n", - "- Store the dataset images in a blob container.\n", - "- Use the registered model to do batch scoring on the images in the data blob container." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Prerequisites\n", - "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core import Experiment\n", - "from azureml.core.compute import AmlCompute, ComputeTarget\n", - "from azureml.core.datastore import Datastore\n", - "from azureml.core.runconfig import CondaDependencies, RunConfiguration\n", - "from azureml.data.data_reference import DataReference\n", - "from azureml.pipeline.core import Pipeline, PipelineData\n", - "from azureml.pipeline.steps import PythonScriptStep" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "from azureml.core import Workspace\n", - "\n", - "ws = Workspace.from_config()\n", - "print('Workspace name: ' + ws.name, \n", - " 'Azure region: ' + ws.location, \n", - " 'Subscription id: ' + ws.subscription_id, \n", - " 'Resource group: ' + ws.resource_group, sep = '\\n')\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Set up machine learning resources" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Set up datastores\n", - "First, let\u00e2\u20ac\u2122s access the datastore that has the model, labels, and images. \n", - "\n", - "### Create a datastore that points to a blob container containing sample images\n", - "\n", - "We have created a public blob container `sampledata` on an account named `pipelinedata`, containing images from the ImageNet evaluation set. In the next step, we create a datastore with the name `images_datastore`, which points to this container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. \n", - "\n", - "This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "account_name = \"pipelinedata\"\n", - "datastore_name=\"images_datastore\"\n", - "container_name=\"sampledata\"\n", - "\n", - "batchscore_blob = Datastore.register_azure_blob_container(ws, \n", - " datastore_name=datastore_name, \n", - " container_name= container_name, \n", - " account_name=account_name, \n", - " overwrite=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, let\u00e2\u20ac\u2122s specify the default datastore for the outputs." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def_data_store = ws.get_default_datastore()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Configure data references\n", - "Now you need to add references to the data, as inputs to the appropriate pipeline steps in your pipeline. A data source in a pipeline is represented by a DataReference object. The DataReference object points to data that lives in, or is accessible from, a datastore. We need DataReference objects corresponding to the following: the directory containing the input images, the directory in which the pretrained model is stored, the directory containing the labels, and the output directory." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "input_images = DataReference(datastore=batchscore_blob, \n", - " data_reference_name=\"input_images\",\n", - " path_on_datastore=\"batchscoring/images\",\n", - " mode=\"download\"\n", - " )\n", - "model_dir = DataReference(datastore=batchscore_blob, \n", - " data_reference_name=\"input_model\",\n", - " path_on_datastore=\"batchscoring/models\",\n", - " mode=\"download\" \n", - " )\n", - "label_dir = DataReference(datastore=batchscore_blob, \n", - " data_reference_name=\"input_labels\",\n", - " path_on_datastore=\"batchscoring/labels\",\n", - " mode=\"download\" \n", - " )\n", - "output_dir = PipelineData(name=\"scores\", \n", - " datastore=def_data_store, \n", - " output_path_on_compute=\"batchscoring/results\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create and attach Compute targets\n", - "Use the below code to create and attach Compute targets. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# choose a name for your cluster\n", - "aml_compute_name = os.environ.get(\"AML_COMPUTE_NAME\", \"gpu-cluster\")\n", - "cluster_min_nodes = os.environ.get(\"AML_COMPUTE_MIN_NODES\", 0)\n", - "cluster_max_nodes = os.environ.get(\"AML_COMPUTE_MAX_NODES\", 1)\n", - "vm_size = os.environ.get(\"AML_COMPUTE_SKU\", \"STANDARD_NC6\")\n", - "\n", - "\n", - "if aml_compute_name in ws.compute_targets:\n", - " compute_target = ws.compute_targets[aml_compute_name]\n", - " if compute_target and type(compute_target) is AmlCompute:\n", - " print('found compute target. just use it. ' + aml_compute_name)\n", - "else:\n", - " print('creating a new compute target...')\n", - " provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size, # NC6 is GPU-enabled\n", - " vm_priority = 'lowpriority', # optional\n", - " min_nodes = cluster_min_nodes, \n", - " max_nodes = cluster_max_nodes)\n", - "\n", - " # create the cluster\n", - " compute_target = ComputeTarget.create(ws, aml_compute_name, provisioning_config)\n", - " \n", - " # can poll for a minimum number of nodes and for a specific timeout. \n", - " # if no min node count is provided it will use the scale settings for the cluster\n", - " compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n", - " \n", - " # For a more detailed view of current Azure Machine Learning Compute status, use get_status()\n", - " print(compute_target.get_status().serialize())" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Prepare the Model" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Download the Model\n", - "\n", - "Download and extract the model from http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz to `\"models\"`" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# create directory for model\n", - "model_dir = 'models'\n", - "if not os.path.isdir(model_dir):\n", - " os.mkdir(model_dir)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import tarfile\n", - "import urllib.request\n", - "\n", - "url=\"http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz\"\n", - "response = urllib.request.urlretrieve(url, \"model.tar.gz\")\n", - "tar = tarfile.open(\"model.tar.gz\", \"r:gz\")\n", - "tar.extractall(model_dir)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Register the model with Workspace" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import shutil\n", - "from azureml.core.model import Model\n", - "\n", - "# register downloaded model \n", - "model = Model.register(model_path = \"models/inception_v3.ckpt\",\n", - " model_name = \"inception\", # this is the name the model is registered as\n", - " tags = {'pretrained': \"inception\"},\n", - " description = \"Imagenet trained tensorflow inception\",\n", - " workspace = ws)\n", - "# remove the downloaded dir after registration if you wish\n", - "shutil.rmtree(\"models\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Write your scoring script" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "To do the scoring, we use a batch scoring script `batch_scoring.py`, which is located in the same directory that this notebook is in. You can take a look at this script to see how you might modify it for your custom batch scoring task.\n", - "\n", - "The python script `batch_scoring.py` takes input images, applies the image classification model to these images, and outputs a classification result to a results file.\n", - "\n", - "The script `batch_scoring.py` takes the following parameters:\n", - "\n", - "- `--model_name`: the name of the model being used, which is expected to be in the `model_dir` directory\n", - "- `--label_dir` : the directory holding the `labels.txt` file \n", - "- `--dataset_path`: the directory containing the input images\n", - "- `--output_dir` : the script will run the model on the data and output a `results-label.txt` to this directory\n", - "- `--batch_size` : the batch size used in running the model.\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Build and run the batch scoring pipeline\n", - "You have everything you need to build the pipeline. Let\u00e2\u20ac\u2122s put all these together." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Specify the environment to run the script\n", - "Specify the conda dependencies for your script. You will need this object when you create the pipeline step later on." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.runconfig import DEFAULT_GPU_IMAGE\n", - "\n", - "cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.13.1\", \"azureml-defaults\"])\n", - "\n", - "# Runconfig\n", - "amlcompute_run_config = RunConfiguration(conda_dependencies=cd)\n", - "amlcompute_run_config.environment.docker.enabled = True\n", - "amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE\n", - "amlcompute_run_config.environment.spark.precache_packages = False" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Specify the parameters for your pipeline\n", - "A subset of the parameters to the python script can be given as input when we re-run a `PublishedPipeline`. In the current example, we define `batch_size` taken by the script as such parameter." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.core.graph import PipelineParameter\n", - "batch_size_param = PipelineParameter(name=\"param_batch_size\", default_value=20)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create the pipeline step\n", - "Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use PythonScriptStep to create the pipeline step." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "inception_model_name = \"inception_v3.ckpt\"\n", - "\n", - "batch_score_step = PythonScriptStep(\n", - " name=\"batch_scoring\",\n", - " script_name=\"batch_scoring.py\",\n", - " arguments=[\"--dataset_path\", input_images, \n", - " \"--model_name\", \"inception\",\n", - " \"--label_dir\", label_dir, \n", - " \"--output_dir\", output_dir, \n", - " \"--batch_size\", batch_size_param],\n", - " compute_target=compute_target,\n", - " inputs=[input_images, label_dir],\n", - " outputs=[output_dir],\n", - " runconfig=amlcompute_run_config\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Run the pipeline\n", - "At this point you can run the pipeline and examine the output it produced. " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "tags": [ - "pipelineparameterssample" - ] - }, - "outputs": [], - "source": [ - "pipeline = Pipeline(workspace=ws, steps=[batch_score_step])\n", - "pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_parameters={\"param_batch_size\": 20})" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Monitor the run" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.widgets import RunDetails\n", - "RunDetails(pipeline_run).show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "pipeline_run.wait_for_completion(show_output=True)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Download and review output" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "step_run = list(pipeline_run.get_children())[0]\n", - "step_run.download_file(\"./outputs/result-labels.txt\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "df = pd.read_csv(\"result-labels.txt\", delimiter=\":\", header=None)\n", - "df.columns = [\"Filename\", \"Prediction\"]\n", - "df.head()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Publish a pipeline and rerun using a REST call" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create a published pipeline\n", - "Once you are satisfied with the outcome of the run, you can publish the pipeline to run it with different input values later. When you publish a pipeline, you will get a REST endpoint that accepts invoking of the pipeline with the set of parameters you have already incorporated above using PipelineParameter." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "published_pipeline = pipeline_run.publish_pipeline(\n", - " name=\"Inception_v3_scoring\", description=\"Batch scoring using Inception v3 model\", version=\"1.0\")\n", - "\n", - "published_pipeline" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Get published pipeline\n", - "\n", - "You can get the published pipeline using **pipeline id**.\n", - "\n", - "To get all the published pipelines for a given workspace(ws): \n", - "```css\n", - "all_pub_pipelines = PublishedPipeline.get_all(ws)\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.core import PublishedPipeline\n", - "\n", - "pipeline_id = published_pipeline.id # use your published pipeline id\n", - "published_pipeline = PublishedPipeline.get(ws, pipeline_id)\n", - "\n", - "published_pipeline" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Rerun the pipeline using the REST endpoint" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Get AAD token\n", - "[This notebook](https://aka.ms/pl-restep-auth) shows how to authenticate to AML workspace." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.core.authentication import InteractiveLoginAuthentication\n", - "import requests\n", - "\n", - "auth = InteractiveLoginAuthentication()\n", - "aad_token = auth.get_authentication_header()\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Run published pipeline" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "rest_endpoint = published_pipeline.endpoint\n", - "# specify batch size when running the pipeline\n", - "response = requests.post(rest_endpoint, \n", - " headers=aad_token, \n", - " json={\"ExperimentName\": \"batch_scoring\",\n", - " \"ParameterAssignments\": {\"param_batch_size\": 50}})" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "try:\n", - " response.raise_for_status()\n", - "except Exception: \n", - " raise Exception('Received bad response from the endpoint: {}\\n'\n", - " 'Response Code: {}\\n'\n", - " 'Headers: {}\\n'\n", - " 'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))\n", - "\n", - "run_id = response.json().get('Id')\n", - "print('Submitted pipeline run: ', run_id)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Monitor the new run" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from azureml.pipeline.core.run import PipelineRun\n", - "published_pipeline_run = PipelineRun(ws.experiments[\"batch_scoring\"], run_id)\n", - "\n", - "RunDetails(published_pipeline_run).show()" - ] - } - ], - "metadata": { - "authors": [ - { - "name": "sanpil" - } - ], - "kernelspec": { - "display_name": "Python 3.6", - "language": "python", - "name": "python36" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.7" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} \ No newline at end of file diff --git a/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.yml b/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.yml deleted file mode 100644 index ac67d296d..000000000 --- a/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.yml +++ /dev/null @@ -1,7 +0,0 @@ -name: pipeline-batch-scoring -dependencies: -- pip: - - azureml-sdk - - azureml-widgets - - pandas - - requests diff --git a/index.md b/index.md index 3aff0d2d7..012a78230 100644 --- a/index.md +++ b/index.md @@ -12,7 +12,6 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an | [Using Azure ML environments](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/training/using-environments/using-environments.ipynb) | Creating and registering environments | None | Local | None | None | None | | [Estimators in AML with hyperparameter tuning](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/training-with-deep-learning/how-to-use-estimator/how-to-use-estimator.ipynb) | Use the Estimator pattern in Azure Machine Learning SDK | None | AML Compute | None | None | None | - ## Tutorials |Title| Task | Dataset | Training Compute | Deployment Target | ML Framework | Tags | @@ -50,7 +49,6 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an | :star:[Azure Machine Learning Pipelines with Data Dependency](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.ipynb) | Demonstrates how to construct a Pipeline with data dependency between steps | Custom | AML Compute | None | Azure ML | None | | [How to use run a notebook as a step in AML Pipelines](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-notebook-runner-step.ipynb) | Demonstrates the use of NotebookRunnerStep | Custom | AML Compute | None | Azure ML | None | - ## Training |Title| Task | Dataset | Training Compute | Deployment Target | ML Framework | Tags | @@ -78,7 +76,6 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an | [Use MLflow with AML for a remote training run](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/track-and-monitor-experiments/using-mlflow/train-remote/train-remote.ipynb) | Use MLflow tracking APIs together with AML for storing your metrics and artifacts | Diabetes | AML Compute | None | None | None | - ## Deployment @@ -94,7 +91,6 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an | [Register Spark model and deploy as webservice](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/deployment/spark/model-register-and-deploy-spark.ipynb) | | Iris | None | Azure Container Instance | PySpark | | - ## Other Notebooks |Title| Task | Dataset | Training Compute | Deployment Target | ML Framework | Tags | |:----|:-----|:-------:|:----------------:|:-----------------:|:------------:|:------------:| @@ -129,7 +125,6 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an | [train-explain-model-on-amlcompute-and-deploy](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/explain-model/azure-integration/scoring-time/train-explain-model-on-amlcompute-and-deploy.ipynb) | | | | | | | | [training_notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/notebook_runner/training_notebook.ipynb) | | | | | | | | [nyc-taxi-data-regression-model-building](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/nyc-taxi-data-regression-model-building/nyc-taxi-data-regression-model-building.ipynb) | | | | | | | -| [pipeline-batch-scoring](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.ipynb) | | | | | | | | [authentication-in-azureml](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/manage-azureml-service/authentication-in-azureml/authentication-in-azureml.ipynb) | | | | | | | | [Logging APIs](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/track-and-monitor-experiments/logging-api/logging-api.ipynb) | Logging APIs and analyzing results | None | None | None | None | None | | [distributed-cntk-with-custom-docker](https://github.com/Azure/MachineLearningNotebooks/blob/master//how-to-use-azureml/training-with-deep-learning/distributed-cntk-with-custom-docker/distributed-cntk-with-custom-docker.ipynb) | | | | | | | @@ -139,5 +134,4 @@ Machine Learning notebook samples and encourage efficient retrieval of topics an | [img-classification-part2-deploy](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/img-classification-part2-deploy.ipynb) | | | | | | | | [regression-automated-ml](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/regression-automated-ml.ipynb) | | | | | | | | [tutorial-1st-experiment-sdk-train](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/tutorial-1st-experiment-sdk-train.ipynb) | | | | | | | -| [tutorial-pipeline-batch-scoring-classification](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/tutorial-pipeline-batch-scoring-classification.ipynb) | | | | | | | - +| [tutorial-pipeline-batch-scoring-classification](https://github.com/Azure/MachineLearningNotebooks/blob/master//tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb) | | | | | | | diff --git a/tutorials/machine-learning-pipelines-advanced/scripts/batch_scoring.py b/tutorials/machine-learning-pipelines-advanced/scripts/batch_scoring.py new file mode 100644 index 000000000..3b5e3dbcf --- /dev/null +++ b/tutorials/machine-learning-pipelines-advanced/scripts/batch_scoring.py @@ -0,0 +1,83 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. + +import os +import argparse +import datetime +import time +import tensorflow as tf +from math import ceil +import numpy as np +import shutil +from tensorflow.contrib.slim.python.slim.nets import inception_v3 + +from azureml.core import Run +from azureml.core.model import Model +from azureml.core.dataset import Dataset + +slim = tf.contrib.slim + +image_size = 299 +num_channel = 3 + + +def get_class_label_dict(): + label = [] + proto_as_ascii_lines = tf.gfile.GFile("labels.txt").readlines() + for l in proto_as_ascii_lines: + label.append(l.rstrip()) + return label + + +def init(): + global g_tf_sess, probabilities, label_dict, input_images + + parser = argparse.ArgumentParser(description="Start a tensorflow model serving") + parser.add_argument('--model_name', dest="model_name", required=True) + parser.add_argument('--labels_name', dest="labels_name", required=True) + args, _ = parser.parse_known_args() + + workspace = Run.get_context(allow_offline=False).experiment.workspace + label_ds = Dataset.get_by_name(workspace=workspace, name=args.labels_name) + label_ds.download(target_path='.', overwrite=True) + + label_dict = get_class_label_dict() + classes_num = len(label_dict) + + with slim.arg_scope(inception_v3.inception_v3_arg_scope()): + input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel]) + logits, _ = inception_v3.inception_v3(input_images, + num_classes=classes_num, + is_training=False) + probabilities = tf.argmax(logits, 1) + + config = tf.ConfigProto() + config.gpu_options.allow_growth = True + g_tf_sess = tf.Session(config=config) + g_tf_sess.run(tf.global_variables_initializer()) + g_tf_sess.run(tf.local_variables_initializer()) + + model_path = Model.get_model_path(args.model_name) + saver = tf.train.Saver() + saver.restore(g_tf_sess, model_path) + + +def file_to_tensor(file_path): + image_string = tf.read_file(file_path) + image = tf.image.decode_image(image_string, channels=3) + + image.set_shape([None, None, None]) + image = tf.image.resize_images(image, [image_size, image_size]) + image = tf.divide(tf.subtract(image, [0]), [255]) + image.set_shape([image_size, image_size, num_channel]) + return image + + +def run(mini_batch): + result_list = [] + for file_path in mini_batch: + test_image = file_to_tensor(file_path) + out = g_tf_sess.run(test_image) + result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]}) + result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]]) + return result_list diff --git a/tutorials/tutorial-pipeline-batch-scoring-classification.ipynb b/tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb similarity index 67% rename from tutorials/tutorial-pipeline-batch-scoring-classification.ipynb rename to tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb index b0b42b8c3..06ca5b767 100644 --- a/tutorials/tutorial-pipeline-batch-scoring-classification.ipynb +++ b/tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb @@ -15,19 +15,16 @@ "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/pipeline-batch-scoring/pipeline-batch-scoring.png)" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "**Note**: Azure Machine Learning recently released ParallelRunStep for public preview, this will allow for parallelization of your workload across many compute nodes without the difficulty of orchestrating worker pools and queues. See the [batch inference notebooks](../contrib/batch_inferencing/) for examples on how to get started." - ] - }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Use Azure Machine Learning Pipelines for batch prediction\n", "\n", + "## Note\n", + "This notebook uses public preview functionality (ParallelRunStep). Please install azureml-contrib-pipeline-steps package before running this notebook.\n", + "\n", + "\n", "In this tutorial, you use Azure Machine Learning service pipelines to run a batch scoring image classification job. The example job uses the pre-trained [Inception-V3](https://arxiv.org/abs/1512.00567) CNN (convolutional neural network) Tensorflow model to classify unlabeled images. Machine learning pipelines optimize your workflow with speed, portability, and reuse so you can focus on your expertise, machine learning, rather than on infrastructure and automation. After building and publishing a pipeline, you can configure a REST endpoint to enable triggering the pipeline from any HTTP library on any platform.\n", "\n", "\n", @@ -37,6 +34,7 @@ "> * Create data objects to fetch and output data\n", "> * Download, prepare, and register the model to your workspace\n", "> * Provision compute targets and create a scoring script\n", + "> * Use ParallelRunStep to do batch scoring\n", "> * Build, run, and publish a pipeline\n", "> * Enable a REST endpoint for the pipeline\n", "\n", @@ -111,14 +109,14 @@ "source": [ "## Create data objects\n", "\n", - "When building pipelines, `DataReference` objects are used for reading data from workspace datastores, and `PipelineData` objects are used for transferring intermediate data between pipeline steps.\n", + "When building pipelines, `Dataset` objects are used for reading data from workspace datastores, and `PipelineData` objects are used for transferring intermediate data between pipeline steps.\n", "\n", "This batch scoring example only uses one pipeline step, but in use-cases with multiple steps, the typical flow will include:\n", "\n", - "1. Using `DataReference` objects as **inputs** to fetch raw data, performing some transformations, then **outputting** a `PipelineData` object.\n", + "1. Using `Dataset` objects as **inputs** to fetch raw data, performing some transformations, then **outputting** a `PipelineData` object.\n", "1. Use the previous step's `PipelineData` **output object** as an *input object*, repeated for subsequent steps.\n", "\n", - "For this scenario you create `DataReference` objects corresponding to the datastore directories for both the input images and the classification labels (y-test values). You also create a `PipelineData` object for the batch scoring output data." + "For this scenario you create `Dataset` objects corresponding to the datastore directories for both the input images and the classification labels (y-test values). You also create a `PipelineData` object for the batch scoring output data." ] }, { @@ -127,21 +125,11 @@ "metadata": {}, "outputs": [], "source": [ - "from azureml.data.data_reference import DataReference\n", + "from azureml.core.dataset import Dataset\n", "from azureml.pipeline.core import PipelineData\n", "\n", - "input_images = DataReference(datastore=batchscore_blob, \n", - " data_reference_name=\"input_images\",\n", - " path_on_datastore=\"batchscoring/images\",\n", - " mode=\"download\"\n", - " )\n", - "\n", - "label_dir = DataReference(datastore=batchscore_blob, \n", - " data_reference_name=\"input_labels\",\n", - " path_on_datastore=\"batchscoring/labels\",\n", - " mode=\"download\" \n", - " )\n", - "\n", + "input_images = Dataset.File.from_files((batchscore_blob, \"batchscoring/images/\"))\n", + "label_ds = Dataset.File.from_files((batchscore_blob, \"batchscoring/labels/*.txt\"))\n", "output_dir = PipelineData(name=\"scores\", \n", " datastore=def_data_store, \n", " output_path_on_compute=\"batchscoring/results\")" @@ -150,6 +138,25 @@ { "cell_type": "markdown", "metadata": {}, + "source": [ + "Next, we need to register the datasets with the workspace." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "input_images = input_images.register(workspace = ws, name = \"input_images\")\n", + "label_ds = label_ds.register(workspace = ws, name = \"label_ds\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "## Download and register the model" ] @@ -192,13 +199,17 @@ "metadata": {}, "outputs": [], "source": [ + "import shutil\n", "from azureml.core.model import Model\n", - " \n", + "\n", + "# register downloaded model \n", "model = Model.register(model_path=\"models/inception_v3.ckpt\",\n", " model_name=\"inception\",\n", " tags={\"pretrained\": \"inception\"},\n", " description=\"Imagenet trained tensorflow inception\",\n", - " workspace=ws)" + " workspace=ws)\n", + "# remove the downloaded dir after registration if you wish\n", + "shutil.rmtree(\"models\")" ] }, { @@ -244,142 +255,16 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "To do the scoring, you create a batch scoring script `batch_scoring.py`, and write it to the current directory. The script takes input images, applies the classification model, and outputs the predictions to a results file.\n", + "To do the scoring, you create a batch scoring script `batch_scoring.py`, and write it to the current directory. The script takes a minibatch of input images, applies the classification model, and outputs the predictions to a results file.\n", "\n", - "The script `batch_scoring.py` takes the following parameters, which get passed from the `PythonScriptStep` that you create later:\n", + "The script `batch_scoring.py` takes the following parameters, which get passed from the `ParallelRunStep` that you create later:\n", "\n", "- `--model_name`: the name of the model being used\n", - "- `--label_dir` : the directory holding the `labels.txt` file \n", - "- `--dataset_path`: the directory containing the input images\n", - "- `--output_dir` : the script will run the model on the data and output a `results-label.txt` to this directory\n", - "- `--batch_size` : the batch size used in running the model\n", + "- `--labels_name` : the name of the `Dataset` holding the `labels.txt` file \n", "\n", "The pipelines infrastructure uses the `ArgumentParser` class to pass parameters into pipeline steps. For example, in the code below the first argument `--model_name` is given the property identifier `model_name`. In the `main()` function, this property is accessed using `Model.get_model_path(args.model_name)`." ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%%writefile batch_scoring.py\n", - "\n", - "import os\n", - "import argparse\n", - "import datetime\n", - "import time\n", - "import tensorflow as tf\n", - "from math import ceil\n", - "import numpy as np\n", - "import shutil\n", - "from tensorflow.contrib.slim.python.slim.nets import inception_v3\n", - "from azureml.core.model import Model\n", - "\n", - "slim = tf.contrib.slim\n", - "\n", - "parser = argparse.ArgumentParser(description=\"Start a tensorflow model serving\")\n", - "parser.add_argument('--model_name', dest=\"model_name\", required=True)\n", - "parser.add_argument('--label_dir', dest=\"label_dir\", required=True)\n", - "parser.add_argument('--dataset_path', dest=\"dataset_path\", required=True)\n", - "parser.add_argument('--output_dir', dest=\"output_dir\", required=True)\n", - "parser.add_argument('--batch_size', dest=\"batch_size\", type=int, required=True)\n", - "\n", - "args = parser.parse_args()\n", - "\n", - "image_size = 299\n", - "num_channel = 3\n", - "\n", - "# create output directory if it does not exist\n", - "os.makedirs(args.output_dir, exist_ok=True)\n", - "\n", - "\n", - "def get_class_label_dict(label_file):\n", - " label = []\n", - " proto_as_ascii_lines = tf.gfile.GFile(label_file).readlines()\n", - " for l in proto_as_ascii_lines:\n", - " label.append(l.rstrip())\n", - " return label\n", - "\n", - "\n", - "class DataIterator:\n", - " def __init__(self, data_dir):\n", - " self.file_paths = []\n", - " image_list = os.listdir(data_dir)\n", - " self.file_paths = [data_dir + '/' + file_name.rstrip() for file_name in image_list]\n", - "\n", - " self.labels = [1 for file_name in self.file_paths]\n", - "\n", - " @property\n", - " def size(self):\n", - " return len(self.labels)\n", - "\n", - " def input_pipeline(self, batch_size):\n", - " images_tensor = tf.convert_to_tensor(self.file_paths, dtype=tf.string)\n", - " labels_tensor = tf.convert_to_tensor(self.labels, dtype=tf.int64)\n", - " input_queue = tf.train.slice_input_producer([images_tensor, labels_tensor], shuffle=False)\n", - " labels = input_queue[1]\n", - " images_content = tf.read_file(input_queue[0])\n", - "\n", - " image_reader = tf.image.decode_jpeg(images_content, channels=num_channel, name=\"jpeg_reader\")\n", - " float_caster = tf.cast(image_reader, tf.float32)\n", - " new_size = tf.constant([image_size, image_size], dtype=tf.int32)\n", - " images = tf.image.resize_images(float_caster, new_size)\n", - " images = tf.divide(tf.subtract(images, [0]), [255])\n", - "\n", - " image_batch, label_batch = tf.train.batch([images, labels], batch_size=batch_size, capacity=5 * batch_size)\n", - " return image_batch\n", - "\n", - "\n", - "def main(_):\n", - " label_file_name = os.path.join(args.label_dir, \"labels.txt\")\n", - " label_dict = get_class_label_dict(label_file_name)\n", - " classes_num = len(label_dict)\n", - " test_feeder = DataIterator(data_dir=args.dataset_path)\n", - " total_size = len(test_feeder.labels)\n", - " count = 0\n", - " \n", - " # get model from model registry\n", - " model_path = Model.get_model_path(args.model_name)\n", - " \n", - " with tf.Session() as sess:\n", - " test_images = test_feeder.input_pipeline(batch_size=args.batch_size)\n", - " with slim.arg_scope(inception_v3.inception_v3_arg_scope()):\n", - " input_images = tf.placeholder(tf.float32, [args.batch_size, image_size, image_size, num_channel])\n", - " logits, _ = inception_v3.inception_v3(input_images,\n", - " num_classes=classes_num,\n", - " is_training=False)\n", - " probabilities = tf.argmax(logits, 1)\n", - "\n", - " sess.run(tf.global_variables_initializer())\n", - " sess.run(tf.local_variables_initializer())\n", - " coord = tf.train.Coordinator()\n", - " threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n", - " saver = tf.train.Saver()\n", - " saver.restore(sess, model_path)\n", - " out_filename = os.path.join(args.output_dir, \"result-labels.txt\")\n", - " with open(out_filename, \"w\") as result_file:\n", - " i = 0\n", - " while count < total_size and not coord.should_stop():\n", - " test_images_batch = sess.run(test_images)\n", - " file_names_batch = test_feeder.file_paths[i * args.batch_size:\n", - " min(test_feeder.size, (i + 1) * args.batch_size)]\n", - " results = sess.run(probabilities, feed_dict={input_images: test_images_batch})\n", - " new_add = min(args.batch_size, total_size - count)\n", - " count += new_add\n", - " i += 1\n", - " for j in range(new_add):\n", - " result_file.write(os.path.basename(file_names_batch[j]) + \": \" + label_dict[results[j]] + \"\\n\")\n", - " result_file.flush()\n", - " coord.request_stop()\n", - " coord.join(threads)\n", - "\n", - " shutil.copy(out_filename, \"./outputs/\")\n", - "\n", - "if __name__ == \"__main__\":\n", - " tf.app.run()" - ] - }, { "cell_type": "markdown", "metadata": {}, @@ -407,26 +292,23 @@ "metadata": {}, "outputs": [], "source": [ + "from azureml.core import Environment\n", + "from azureml.core.conda_dependencies import CondaDependencies\n", "from azureml.core.runconfig import DEFAULT_GPU_IMAGE\n", - "from azureml.core.runconfig import CondaDependencies, RunConfiguration\n", "\n", "cd = CondaDependencies.create(pip_packages=[\"tensorflow-gpu==1.13.1\", \"azureml-defaults\"])\n", "\n", - "amlcompute_run_config = RunConfiguration(conda_dependencies=cd)\n", - "amlcompute_run_config.environment.docker.enabled = True\n", - "amlcompute_run_config.environment.docker.base_image = DEFAULT_GPU_IMAGE\n", - "amlcompute_run_config.environment.spark.precache_packages = False" + "env = Environment(name=\"parallelenv\")\n", + "env.python.conda_dependencies=cd\n", + "env.docker.base_image = DEFAULT_GPU_IMAGE" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Parameterize the pipeline\n", - "\n", - "Define a custom parameter for the pipeline to control the batch size. After the pipeline has been published and exposed via a REST endpoint, any configured parameters are also exposed and can be specified in the JSON payload when rerunning the pipeline with an HTTP request.\n", - "\n", - "Create a `PipelineParameter` object to enable this behavior, and define a name and default value." + "### Create the configuration to wrap the inference script\n", + "Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use PythonScriptStep to create the pipeline step." ] }, { @@ -435,8 +317,19 @@ "metadata": {}, "outputs": [], "source": [ - "from azureml.pipeline.core.graph import PipelineParameter\n", - "batch_size_param = PipelineParameter(name=\"param_batch_size\", default_value=20)" + "from azureml.contrib.pipeline.steps import ParallelRunConfig\n", + "\n", + "parallel_run_config = ParallelRunConfig(\n", + " environment=env,\n", + " entry_script=\"batch_scoring.py\",\n", + " source_directory=\"scripts\",\n", + " output_action=\"append_row\",\n", + " mini_batch_size=\"20\",\n", + " error_threshold=1,\n", + " compute_target=compute_target,\n", + " process_count_per_node=2,\n", + " node_count=1\n", + ")" ] }, { @@ -452,7 +345,7 @@ "* input and output data, and any custom parameters\n", "* reference to a script or SDK-logic to run during the step\n", "\n", - "There are multiple classes that inherit from the parent class [`PipelineStep`](https://docs.microsoft.com/python/api/azureml-pipeline-core/azureml.pipeline.core.builder.pipelinestep?view=azure-ml-py) to assist with building a step using certain frameworks and stacks. In this example, you use the [`PythonScriptStep`](https://docs.microsoft.com/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py) class to define your step logic using a custom python script. Note that if an argument to your script is either an input to the step or output of the step, it must be defined **both** in the `arguments` array, **as well as** in either the `input` or `output` parameter, respectively. \n", + "There are multiple classes that inherit from the parent class [`PipelineStep`](https://docs.microsoft.com/python/api/azureml-pipeline-core/azureml.pipeline.core.builder.pipelinestep?view=azure-ml-py) to assist with building a step using certain frameworks and stacks. In this example, you use the [`ParallelRunStep`](https://docs.microsoft.com/en-us/python/api/azureml-contrib-pipeline-steps/azureml.contrib.pipeline.steps.parallelrunstep?view=azure-ml-py) class to define your step logic using a scoring script. \n", "\n", "An object reference in the `outputs` array becomes available as an **input** for a subsequent pipeline step, for scenarios where there is more than one step." ] @@ -463,20 +356,20 @@ "metadata": {}, "outputs": [], "source": [ - "from azureml.pipeline.steps import PythonScriptStep\n", + "from azureml.contrib.pipeline.steps import ParallelRunStep\n", + "from datetime import datetime\n", "\n", - "batch_score_step = PythonScriptStep(\n", - " name=\"batch_scoring\",\n", - " script_name=\"batch_scoring.py\",\n", - " arguments=[\"--dataset_path\", input_images, \n", - " \"--model_name\", \"inception\",\n", - " \"--label_dir\", label_dir, \n", - " \"--output_dir\", output_dir, \n", - " \"--batch_size\", batch_size_param],\n", - " compute_target=compute_target,\n", - " inputs=[input_images, label_dir],\n", - " outputs=[output_dir],\n", - " runconfig=amlcompute_run_config\n", + "parallel_step_name = \"batchscoring-\" + datetime.now().strftime(\"%Y%m%d%H%M\")\n", + "\n", + "batch_score_step = ParallelRunStep(\n", + " name=parallel_step_name,\n", + " inputs=[input_images.as_named_input(\"input_images\")],\n", + " output=output_dir,\n", + " models=[model],\n", + " arguments=[\"--model_name\", \"inception\",\n", + " \"--labels_name\", \"label_ds\"],\n", + " parallel_run_config=parallel_run_config,\n", + " allow_reuse=False\n", ")" ] }, @@ -510,7 +403,7 @@ "from azureml.pipeline.core import Pipeline\n", "\n", "pipeline = Pipeline(workspace=ws, steps=[batch_score_step])\n", - "pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline, pipeline_parameters={\"param_batch_size\": 20})\n", + "pipeline_run = Experiment(ws, \"batch_scoring\").submit(pipeline)\n", "pipeline_run.wait_for_completion(show_output=True)" ] }, @@ -534,14 +427,20 @@ "metadata": {}, "outputs": [], "source": [ - "import pandas as pd\n", + "batch_run = next(pipeline_run.get_children())\n", + "batch_output = batch_run.get_output_data(\"scores\")\n", + "batch_output.download(local_path=\"inception_results\")\n", "\n", - "step_run = list(pipeline_run.get_children())[0]\n", - "step_run.download_file(\"./outputs/result-labels.txt\")\n", + "import pandas as pd\n", + "for root, dirs, files in os.walk(\"inception_results\"):\n", + " for file in files:\n", + " if file.endswith(\"parallel_run_step.txt\"):\n", + " result_file = os.path.join(root,file)\n", "\n", - "df = pd.read_csv(\"result-labels.txt\", delimiter=\":\", header=None)\n", + "df = pd.read_csv(result_file, delimiter=\":\", header=None)\n", "df.columns = [\"Filename\", \"Prediction\"]\n", - "df.head(10)" + "print(\"Prediction has \", df.shape[0], \" rows\")\n", + "df.head(10) " ] }, { @@ -599,7 +498,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Get the REST url from the `endpoint` property of the published pipeline object. You can also find the REST url in your workspace in the portal. Build an HTTP POST request to the endpoint, specifying your authentication header. Additionally, add a JSON payload object with the experiment name and the batch size parameter. As a reminder, the `param_batch_size` is passed through to your `batch_scoring.py` script because you defined it as a `PipelineParameter` object in the step configuration.\n", + "Get the REST url from the `endpoint` property of the published pipeline object. You can also find the REST url in your workspace in the portal. Build an HTTP POST request to the endpoint, specifying your authentication header. Additionally, add a JSON payload object with the experiment name and the batch size parameter. As a reminder, the `process_count_per_node` is passed through to `ParallelRunStep` because you defined it is defined as a `PipelineParameter` object in the step configuration.\n", "\n", "Make the request to trigger the run. Access the `Id` key from the response dict to get the value of the run id." ] @@ -616,8 +515,25 @@ "response = requests.post(rest_endpoint, \n", " headers=auth_header, \n", " json={\"ExperimentName\": \"batch_scoring\",\n", - " \"ParameterAssignments\": {\"param_batch_size\": 50}})\n", - "run_id = response.json()[\"Id\"]" + " \"ParameterAssignments\": {\"process_count_per_node\": 6}})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " response.raise_for_status()\n", + "except Exception: \n", + " raise Exception(\"Received bad response from the endpoint: {}\\n\"\n", + " \"Response Code: {}\\n\"\n", + " \"Headers: {}\\n\"\n", + " \"Content: {}\".format(rest_endpoint, response.status_code, response.headers, response.content))\n", + "\n", + "run_id = response.json().get('Id')\n", + "print('Submitted pipeline run: ', run_id)" ] }, { @@ -652,7 +568,8 @@ "\n", "If you used a cloud notebook server, stop the VM when you are not using it to reduce cost.\n", "\n", - "1. In your workspace, select **Notebook VMs**.\n", + "1. In your workspace, select **Compute**.\n", + "1. Select the **Notebook VMs** tab in the compute page.\n", "1. From the list, select the VM.\n", "1. Select **Stop**.\n", "1. When you're ready to use the server again, select **Start**.\n", @@ -683,19 +600,16 @@ "\n", "See the [how-to](https://docs.microsoft.com/azure/machine-learning/service/how-to-create-your-first-pipeline?view=azure-devops) for additional detail on building pipelines with the machine learning SDK." ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { "authors": [ { - "name": "sanpil" + "name": [ + "sanpil", + "trmccorm", + "pansav" + ] } ], "kernelspec": { diff --git a/tutorials/tutorial-pipeline-batch-scoring-classification.yml b/tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.yml similarity index 82% rename from tutorials/tutorial-pipeline-batch-scoring-classification.yml rename to tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.yml index bb6402691..1e896b846 100644 --- a/tutorials/tutorial-pipeline-batch-scoring-classification.yml +++ b/tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.yml @@ -3,7 +3,7 @@ dependencies: - pip: - azureml-sdk - azureml-pipeline-core - - azureml-pipeline-steps + - azureml-contrib-pipeline-steps - pandas - requests - azureml-widgets