From 83995e80c56599cb14f391cdab9abf8d1a0ea2ea Mon Sep 17 00:00:00 2001 From: Hongye Sun Date: Thu, 14 Mar 2019 16:53:13 -0700 Subject: [PATCH] Add documentation and sample notebook for dataproc components. --- .../google/dataproc/_submit_spark_job.py | 4 +- .../gcp/dataproc/create_cluster/README.md | 112 ++++++ .../gcp/dataproc/create_cluster/sample.ipynb | 207 +++++++++++ .../gcp/dataproc/delete_cluster/README.md | 103 ++++++ .../gcp/dataproc/delete_cluster/sample.ipynb | 205 +++++++++++ .../gcp/dataproc/submit_hadoop_job/README.md | 176 +++++++++ .../dataproc/submit_hadoop_job/sample.ipynb | 336 ++++++++++++++++++ .../gcp/dataproc/submit_hive_job/README.md | 137 +++++++ .../gcp/dataproc/submit_hive_job/sample.ipynb | 232 ++++++++++++ .../gcp/dataproc/submit_pig_job/README.md | 129 +++++++ .../gcp/dataproc/submit_pig_job/sample.ipynb | 226 ++++++++++++ .../gcp/dataproc/submit_pyspark_job/README.md | 122 +++++++ .../dataproc/submit_pyspark_job/sample.ipynb | 221 ++++++++++++ .../gcp/dataproc/submit_spark_job/README.md | 124 +++++++ .../dataproc/submit_spark_job/component.yaml | 4 +- .../dataproc/submit_spark_job/sample.ipynb | 219 ++++++++++++ .../dataproc/submit_sparksql_job/README.md | 142 ++++++++ .../dataproc/submit_sparksql_job/sample.ipynb | 239 +++++++++++++ 18 files changed, 2932 insertions(+), 6 deletions(-) create mode 100644 components/gcp/dataproc/create_cluster/README.md create mode 100644 components/gcp/dataproc/create_cluster/sample.ipynb create mode 100644 components/gcp/dataproc/delete_cluster/README.md create mode 100644 components/gcp/dataproc/delete_cluster/sample.ipynb create mode 100644 components/gcp/dataproc/submit_hadoop_job/README.md create mode 100644 components/gcp/dataproc/submit_hadoop_job/sample.ipynb create mode 100644 components/gcp/dataproc/submit_hive_job/README.md create mode 100644 components/gcp/dataproc/submit_hive_job/sample.ipynb create mode 100644 components/gcp/dataproc/submit_pig_job/README.md create mode 100644 components/gcp/dataproc/submit_pig_job/sample.ipynb create mode 100644 components/gcp/dataproc/submit_pyspark_job/README.md create mode 100644 components/gcp/dataproc/submit_pyspark_job/sample.ipynb create mode 100644 components/gcp/dataproc/submit_spark_job/README.md create mode 100644 components/gcp/dataproc/submit_spark_job/sample.ipynb create mode 100644 components/gcp/dataproc/submit_sparksql_job/README.md create mode 100644 components/gcp/dataproc/submit_sparksql_job/sample.ipynb diff --git a/component_sdk/python/kfp_component/google/dataproc/_submit_spark_job.py b/component_sdk/python/kfp_component/google/dataproc/_submit_spark_job.py index eaba3d71f9e..93fc17614c3 100644 --- a/component_sdk/python/kfp_component/google/dataproc/_submit_spark_job.py +++ b/component_sdk/python/kfp_component/google/dataproc/_submit_spark_job.py @@ -25,9 +25,7 @@ def submit_spark_job(project_id, region, cluster_name, region (str): Required. The Cloud Dataproc region in which to handle the request. cluster_name (str): Required. The cluster to run the job. - main_jar_file_uri (str): The name of the driver's main class. The jar file - that contains the class must be in the default CLASSPATH or specified - in jarFileUris. + main_jar_file_uri (str): The HCFS URI of the jar file that contains the main class. main_class (str): The name of the driver's main class. The jar file that contains the class must be in the default CLASSPATH or specified in jarFileUris. diff --git a/components/gcp/dataproc/create_cluster/README.md b/components/gcp/dataproc/create_cluster/README.md new file mode 100644 index 00000000000..74b82d6af93 --- /dev/null +++ b/components/gcp/dataproc/create_cluster/README.md @@ -0,0 +1,112 @@ + +# Dataproc - Create Cluster + +## Intended Use +A Kubeflow Pipeline component to create a cluster in Google Cloud Dataproc service. + +## Run-Time Parameters: +Name | Description +:--- | :---------- +project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to. +region | Required. The Cloud Dataproc region in which to handle the request. +name | Optional. The cluster name. Cluster names within a project must be unique. Names of deleted clusters can be reused. +name_prefix | Optional. The prefix of the cluster name. +initialization_actions | Optional. List of GCS URIs of executables to execute on each node after config is completed. By default, executables are run on master and all worker nodes. +config_bucket | Optional. A Google Cloud Storage bucket used to stage job dependencies, config files, and job driver console output. +image_version | Optional. The version of software inside the cluster. +cluster | Optional. The full [cluster config](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster) +wait_interval | The wait seconds between polling the operation. Defaults to 30s. + +## Output: +Name | Description +:--- | :---------- +cluster_name | The cluster name of the created cluster. + +## Sample + +Note: the sample code below works in both IPython notebook or python code directly. + +### Set sample parameters + + +```python +# Required Parameters +PROJECT_ID = '' + +# Optional Parameters +EXPERIMENT_NAME = 'Dataproc - Create Cluster' +COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/e5b0081cdcbef6a056c0da114d2eb81ab8d8152d/components/gcp/dataproc/create_cluster/component.yaml' +``` + +### Install KFP SDK +Install the SDK (Uncomment the code if the SDK is not installed before) + + +```python +#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz' +#!pip3 install $KFP_PACKAGE --upgrade +``` + +### Load component definitions + + +```python +import kfp.components as comp + +dataproc_create_cluster_op = comp.load_component_from_url(COMPONENT_SPEC_URI) +display(dataproc_create_cluster_op) +``` + +### Here is an illustrative pipeline that uses the component + + +```python +import kfp.dsl as dsl +import kfp.gcp as gcp +import json +@dsl.pipeline( + name='Dataproc create cluster pipeline', + description='Dataproc create cluster pipeline' +) +def dataproc_create_cluster_pipeline( + project_id = PROJECT_ID, + region = 'us-central1', + name='', + name_prefix='', + job_name_prefix='', + initialization_actions='', + config_bucket='', + image_version='', + cluster='', + wait_interval='30' +): + dataproc_create_cluster_op(project_id, region, name, name_prefix, job_name_prefix, initialization_actions, + config_bucket, image_version, cluster, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa')) +``` + +### Compile the pipeline + + +```python +pipeline_func = dataproc_create_cluster_pipeline +pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz' +import kfp.compiler as compiler +compiler.Compiler().compile(pipeline_func, pipeline_filename) +``` + +### Submit the pipeline for execution + + +```python +#Specify pipeline argument values +arguments = {} + +#Get or create an experiment and submit a pipeline run +import kfp +client = kfp.Client() +experiment = client.create_experiment(EXPERIMENT_NAME) + +#Submit a pipeline run +run_name = pipeline_func.__name__ + ' run' +run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) +``` diff --git a/components/gcp/dataproc/create_cluster/sample.ipynb b/components/gcp/dataproc/create_cluster/sample.ipynb new file mode 100644 index 00000000000..f4fc573d367 --- /dev/null +++ b/components/gcp/dataproc/create_cluster/sample.ipynb @@ -0,0 +1,207 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataproc - Create Cluster\n", + "\n", + "## Intended Use\n", + "A Kubeflow Pipeline component to create a cluster in Google Cloud Dataproc service. \n", + "\n", + "## Run-Time Parameters:\n", + "Name | Description\n", + ":--- | :----------\n", + "project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n", + "region | Required. The Cloud Dataproc region in which to handle the request.\n", + "name | Optional. The cluster name. Cluster names within a project must be unique. Names of deleted clusters can be reused.\n", + "name_prefix | Optional. The prefix of the cluster name.\n", + "initialization_actions | Optional. List of GCS URIs of executables to execute on each node after config is completed. By default, executables are run on master and all worker nodes. \n", + "config_bucket | Optional. A Google Cloud Storage bucket used to stage job dependencies, config files, and job driver console output.\n", + "image_version | Optional. The version of software inside the cluster.\n", + "cluster | Optional. The full [cluster config](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster)\n", + "wait_interval | The wait seconds between polling the operation. Defaults to 30s.\n", + "\n", + "## Output:\n", + "Name | Description\n", + ":--- | :----------\n", + "cluster_name | The cluster name of the created cluster." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample\n", + "\n", + "Note: the sample code below works in both IPython notebook or python code directly." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set sample parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "# Required Parameters\n", + "PROJECT_ID = ''\n", + "\n", + "# Optional Parameters\n", + "EXPERIMENT_NAME = 'Dataproc - Create Cluster'\n", + "COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/e5b0081cdcbef6a056c0da114d2eb81ab8d8152d/components/gcp/dataproc/create_cluster/component.yaml'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install KFP SDK\n", + "Install the SDK (Uncomment the code if the SDK is not installed before)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n", + "#!pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load component definitions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.components as comp\n", + "\n", + "dataproc_create_cluster_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n", + "display(dataproc_create_cluster_op)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Here is an illustrative pipeline that uses the component" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.dsl as dsl\n", + "import kfp.gcp as gcp\n", + "import json\n", + "@dsl.pipeline(\n", + " name='Dataproc create cluster pipeline',\n", + " description='Dataproc create cluster pipeline'\n", + ")\n", + "def dataproc_create_cluster_pipeline(\n", + " project_id = PROJECT_ID, \n", + " region = 'us-central1', \n", + " name='', \n", + " name_prefix='', \n", + " job_name_prefix='', \n", + " initialization_actions='', \n", + " config_bucket='', \n", + " image_version='', \n", + " cluster='', \n", + " wait_interval='30'\n", + "):\n", + " dataproc_create_cluster_op(project_id, region, name, name_prefix, job_name_prefix, initialization_actions, \n", + " config_bucket, image_version, cluster, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compile the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = dataproc_create_cluster_pipeline\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "import kfp.compiler as compiler\n", + "compiler.Compiler().compile(pipeline_func, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline for execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Specify pipeline argument values\n", + "arguments = {}\n", + "\n", + "#Get or create an experiment and submit a pipeline run\n", + "import kfp\n", + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)\n", + "\n", + "#Submit a pipeline run\n", + "run_name = pipeline_func.__name__ + ' run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/components/gcp/dataproc/delete_cluster/README.md b/components/gcp/dataproc/delete_cluster/README.md new file mode 100644 index 00000000000..b54b80bc9ba --- /dev/null +++ b/components/gcp/dataproc/delete_cluster/README.md @@ -0,0 +1,103 @@ + +# Dataproc - Delete Cluster + +## Intended Use +A Kubeflow Pipeline component to delete a cluster in Google Cloud Dataproc service. + +## Run-Time Parameters: +Name | Description +:--- | :---------- +project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to. +region | Required. The Cloud Dataproc region in which to handle the request. +name | Required. The cluster name to delete. +wait_interval | The wait seconds between polling the operation. Defaults to 30s. + +## Sample + +Note: the sample code below works in both IPython notebook or python code directly. + +### Prerequisites + +Before running the sample code, you need to [create a Dataproc cluster](https://cloud.google.com/dataproc/docs/guides/create-cluster). + +### Set sample parameters + + +```python +PROJECT_ID = '' +CLUSTER_NAME = '' + +REGION = 'us-central1' +EXPERIMENT_NAME = 'Dataproc - Delete Cluster' +COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/e5b0081cdcbef6a056c0da114d2eb81ab8d8152d/components/gcp/dataproc/delete_cluster/component.yaml' +``` + +### Install KFP SDK +Install the SDK (Uncomment the code if the SDK is not installed before) + + +```python +#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz' +#!pip3 install $KFP_PACKAGE --upgrade +``` + +### Load component definitions + + +```python +import kfp.components as comp + +dataproc_delete_cluster_op = comp.load_component_from_url(COMPONENT_SPEC_URI) +display(dataproc_delete_cluster_op) +``` + +### Here is an illustrative pipeline that uses the component + + +```python +import kfp.dsl as dsl +import kfp.gcp as gcp +import json +@dsl.pipeline( + name='Dataproc delete cluster pipeline', + description='Dataproc delete cluster pipeline' +) +def dataproc_delete_cluster_pipeline( + project_id = PROJECT_ID, + region = REGION, + name = CLUSTER_NAME +): + dataproc_delete_cluster_op(project_id, region, name).apply(gcp.use_gcp_secret('user-gcp-sa')) +``` + +### Compile the pipeline + + +```python +pipeline_func = dataproc_delete_cluster_pipeline +pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz' +import kfp.compiler as compiler +compiler.Compiler().compile(pipeline_func, pipeline_filename) +``` + +### Submit the pipeline for execution + + +```python +#Specify pipeline argument values +arguments = {} + +#Get or create an experiment and submit a pipeline run +import kfp +client = kfp.Client() +experiment = client.create_experiment(EXPERIMENT_NAME) + +#Submit a pipeline run +run_name = pipeline_func.__name__ + ' run' +run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) +``` + + +```python + +``` diff --git a/components/gcp/dataproc/delete_cluster/sample.ipynb b/components/gcp/dataproc/delete_cluster/sample.ipynb new file mode 100644 index 00000000000..7e8aae7d840 --- /dev/null +++ b/components/gcp/dataproc/delete_cluster/sample.ipynb @@ -0,0 +1,205 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataproc - Delete Cluster\n", + "\n", + "## Intended Use\n", + "A Kubeflow Pipeline component to delete a cluster in Google Cloud Dataproc service. \n", + "\n", + "## Run-Time Parameters:\n", + "Name | Description\n", + ":--- | :----------\n", + "project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n", + "region | Required. The Cloud Dataproc region in which to handle the request.\n", + "name | Required. The cluster name to delete.\n", + "wait_interval | The wait seconds between polling the operation. Defaults to 30s." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample\n", + "\n", + "Note: the sample code below works in both IPython notebook or python code directly." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Prerequisites\n", + "\n", + "Before running the sample code, you need to [create a Dataproc cluster](https://cloud.google.com/dataproc/docs/guides/create-cluster)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set sample parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "PROJECT_ID = ''\n", + "CLUSTER_NAME = ''\n", + "\n", + "REGION = 'us-central1'\n", + "EXPERIMENT_NAME = 'Dataproc - Delete Cluster'\n", + "COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/e5b0081cdcbef6a056c0da114d2eb81ab8d8152d/components/gcp/dataproc/delete_cluster/component.yaml'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install KFP SDK\n", + "Install the SDK (Uncomment the code if the SDK is not installed before)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n", + "#!pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load component definitions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.components as comp\n", + "\n", + "dataproc_delete_cluster_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n", + "display(dataproc_delete_cluster_op)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Here is an illustrative pipeline that uses the component" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.dsl as dsl\n", + "import kfp.gcp as gcp\n", + "import json\n", + "@dsl.pipeline(\n", + " name='Dataproc delete cluster pipeline',\n", + " description='Dataproc delete cluster pipeline'\n", + ")\n", + "def dataproc_delete_cluster_pipeline(\n", + " project_id = PROJECT_ID, \n", + " region = REGION,\n", + " name = CLUSTER_NAME\n", + "):\n", + " dataproc_delete_cluster_op(project_id, region, name).apply(gcp.use_gcp_secret('user-gcp-sa'))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compile the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = dataproc_delete_cluster_pipeline\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "import kfp.compiler as compiler\n", + "compiler.Compiler().compile(pipeline_func, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline for execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Specify pipeline argument values\n", + "arguments = {}\n", + "\n", + "#Get or create an experiment and submit a pipeline run\n", + "import kfp\n", + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)\n", + "\n", + "#Submit a pipeline run\n", + "run_name = pipeline_func.__name__ + ' run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/components/gcp/dataproc/submit_hadoop_job/README.md b/components/gcp/dataproc/submit_hadoop_job/README.md new file mode 100644 index 00000000000..1d616924a37 --- /dev/null +++ b/components/gcp/dataproc/submit_hadoop_job/README.md @@ -0,0 +1,176 @@ + +# Dataproc - Submit Hadoop Job + +## Intended Use +A Kubeflow Pipeline component to submit a Apache Hadoop MapReduce job on Apache Hadoop YARN in Google Cloud Dataproc service. + +## Run-Time Parameters: +Name | Description +:--- | :---------- +project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to. +region | Required. The Cloud Dataproc region in which to handle the request. +cluster_name | Required. The cluster to run the job. +main_jar_file_uri | The HCFS URI of the jar file containing the main class. Examples: `gs://foo-bucket/analytics-binaries/extract-useful-metrics-mr.jar` `hdfs:/tmp/test-samples/custom-wordcount.jar` `file:///home/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar` +main_class | The name of the driver's main class. The jar file that contains the class must be in the default CLASSPATH or specified in jarFileUris. +args | Optional. The arguments to pass to the driver. Do not include arguments, such as -libjars or -Dfoo=bar, that can be set as job properties, since a collision may occur that causes an incorrect job submission. +hadoop_job | Optional. The full payload of a [HadoopJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/HadoopJob). +job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs). +wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s. + +## Output: +Name | Description +:--- | :---------- +job_id | The ID of the created job. + +## Sample + +Note: the sample code below works in both IPython notebook or python code directly. + +### Setup a Dataproc cluster +Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one. + +### Prepare Hadoop job +Upload your Hadoop jar file to a Google Cloud Storage (GCS) bucket. In the sample, we will use a jar file that is pre-installed in the main cluster, so there is no need to provide the `main_jar_file_uri`. We only set `main_class` to be `org.apache.hadoop.examples.WordCount`. + +Here is the [source code of example](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java). + +To package a self-contained Hadoop MapReduct application from source code, follow the [instructions](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html). + +### Set sample parameters + + +```python +PROJECT_ID = '' +CLUSTER_NAME = '' +OUTPUT_GCS_PATH = '' +REGION = 'us-central1' +MAIN_CLASS = 'org.apache.hadoop.examples.WordCount' +INTPUT_GCS_PATH = 'gs://ml-pipeline-playground/shakespeare1.txt' +EXPERIMENT_NAME = 'Dataproc - Submit Hadoop Job' +COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/7622e57666c17088c94282ccbe26d6a52768c226/components/gcp/dataproc/submit_hadoop_job/component.yaml' +``` + +### Insepct Input Data +The input file is a simple text file: + + +```python +!gsutil cat $INTPUT_GCS_PATH +``` + + With which he yoketh your rebellious necks Razeth your cities and subverts your towns And in a moment makes them desolate + + +### Clean up existing output files (Optional) +This is needed because the sample code requires the output folder to be a clean folder. +To continue to run the sample, make sure that the service account of the notebook server has access to the `OUTPUT_GCS_PATH`. + +**CAUTION**: This will remove all blob files under `OUTPUT_GCS_PATH`. + + +```python +!gsutil rm $OUTPUT_GCS_PATH/** +``` + + CommandException: No URLs matched: gs://hongyes-ml-tests/dataproc/hadoop/output/** + + +### Install KFP SDK +Install the SDK (Uncomment the code if the SDK is not installed before) + + +```python +# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz' +# !pip3 install $KFP_PACKAGE --upgrade +``` + +### Load component definitions + + +```python +import kfp.components as comp + +dataproc_submit_hadoop_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI) +display(dataproc_submit_hadoop_job_op) +``` + + + + + +### Here is an illustrative pipeline that uses the component + + +```python +import kfp.dsl as dsl +import kfp.gcp as gcp +import json +@dsl.pipeline( + name='Dataproc submit Hadoop job pipeline', + description='Dataproc submit Hadoop job pipeline' +) +def dataproc_submit_hadoop_job_pipeline( + project_id = PROJECT_ID, + region = REGION, + cluster_name = CLUSTER_NAME, + main_jar_file_uri = '', + main_class = MAIN_CLASS, + args = json.dumps([ + INTPUT_GCS_PATH, + OUTPUT_GCS_PATH + ]), + hadoop_job='', + job='{}', + wait_interval='30' +): + dataproc_submit_hadoop_job_op(project_id, region, cluster_name, main_jar_file_uri, main_class, + args, hadoop_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa')) + +``` + +### Compile the pipeline + + +```python +pipeline_func = dataproc_submit_hadoop_job_pipeline +pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz' +import kfp.compiler as compiler +compiler.Compiler().compile(pipeline_func, pipeline_filename) +``` + +### Submit the pipeline for execution + + +```python +#Specify pipeline argument values +arguments = {} + +#Get or create an experiment and submit a pipeline run +import kfp +client = kfp.Client() +experiment = client.create_experiment(EXPERIMENT_NAME) + +#Submit a pipeline run +run_name = pipeline_func.__name__ + ' run' +run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) +``` + + +Experiment link here + + + +Run link here + + +### Inspect the outputs + +The sample in the notebook will count the words in the input text and output them in sharded files. Here is the command to inspect them: + + +```python +!gsutil cat $OUTPUT_GCS_PATH/* +``` + + AccessDeniedException: 403 + diff --git a/components/gcp/dataproc/submit_hadoop_job/sample.ipynb b/components/gcp/dataproc/submit_hadoop_job/sample.ipynb new file mode 100644 index 00000000000..d0974da89ba --- /dev/null +++ b/components/gcp/dataproc/submit_hadoop_job/sample.ipynb @@ -0,0 +1,336 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataproc - Submit Hadoop Job\n", + "\n", + "## Intended Use\n", + "A Kubeflow Pipeline component to submit a Apache Hadoop MapReduce job on Apache Hadoop YARN in Google Cloud Dataproc service. \n", + "\n", + "## Run-Time Parameters:\n", + "Name | Description\n", + ":--- | :----------\n", + "project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n", + "region | Required. The Cloud Dataproc region in which to handle the request.\n", + "cluster_name | Required. The cluster to run the job.\n", + "main_jar_file_uri | The HCFS URI of the jar file containing the main class. Examples: `gs://foo-bucket/analytics-binaries/extract-useful-metrics-mr.jar` `hdfs:/tmp/test-samples/custom-wordcount.jar` `file:///home/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar`\n", + "main_class | The name of the driver's main class. The jar file that contains the class must be in the default CLASSPATH or specified in jarFileUris. \n", + "args | Optional. The arguments to pass to the driver. Do not include arguments, such as -libjars or -Dfoo=bar, that can be set as job properties, since a collision may occur that causes an incorrect job submission.\n", + "hadoop_job | Optional. The full payload of a [HadoopJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/HadoopJob).\n", + "job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).\n", + "wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s.\n", + "\n", + "## Output:\n", + "Name | Description\n", + ":--- | :----------\n", + "job_id | The ID of the created job." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample\n", + "\n", + "Note: the sample code below works in both IPython notebook or python code directly.\n", + "\n", + "### Setup a Dataproc cluster\n", + "Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one.\n", + "\n", + "### Prepare Hadoop job\n", + "Upload your Hadoop jar file to a Google Cloud Storage (GCS) bucket. In the sample, we will use a jar file that is pre-installed in the main cluster, so there is no need to provide the `main_jar_file_uri`. We only set `main_class` to be `org.apache.hadoop.examples.WordCount`.\n", + "\n", + "Here is the [source code of example](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java).\n", + "\n", + "To package a self-contained Hadoop MapReduct application from source code, follow the [instructions](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set sample parameters" + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "PROJECT_ID = ''\n", + "CLUSTER_NAME = ''\n", + "OUTPUT_GCS_PATH = ''\n", + "REGION = 'us-central1'\n", + "MAIN_CLASS = 'org.apache.hadoop.examples.WordCount'\n", + "INTPUT_GCS_PATH = 'gs://ml-pipeline-playground/shakespeare1.txt'\n", + "EXPERIMENT_NAME = 'Dataproc - Submit Hadoop Job'\n", + "COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/7622e57666c17088c94282ccbe26d6a52768c226/components/gcp/dataproc/submit_hadoop_job/component.yaml'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Insepct Input Data\n", + "The input file is a simple text file:" + ] + }, + { + "cell_type": "code", + "execution_count": 61, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "With which he yoketh your rebellious necks Razeth your cities and subverts your towns And in a moment makes them desolate\r\n" + ] + } + ], + "source": [ + "!gsutil cat $INTPUT_GCS_PATH" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Clean up existing output files (Optional)\n", + "This is needed because the sample code requires the output folder to be a clean folder.\n", + "To continue to run the sample, make sure that the service account of the notebook server has access to the `OUTPUT_GCS_PATH`.\n", + "\n", + "**CAUTION**: This will remove all blob files under `OUTPUT_GCS_PATH`." + ] + }, + { + "cell_type": "code", + "execution_count": 62, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CommandException: No URLs matched: gs://hongyes-ml-tests/dataproc/hadoop/output/**\r\n" + ] + } + ], + "source": [ + "!gsutil rm $OUTPUT_GCS_PATH/**" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install KFP SDK\n", + "Install the SDK (Uncomment the code if the SDK is not installed before)" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n", + "# !pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load component definitions" + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import kfp.components as comp\n", + "\n", + "dataproc_submit_hadoop_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n", + "display(dataproc_submit_hadoop_job_op)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Here is an illustrative pipeline that uses the component" + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.dsl as dsl\n", + "import kfp.gcp as gcp\n", + "import json\n", + "@dsl.pipeline(\n", + " name='Dataproc submit Hadoop job pipeline',\n", + " description='Dataproc submit Hadoop job pipeline'\n", + ")\n", + "def dataproc_submit_hadoop_job_pipeline(\n", + " project_id = PROJECT_ID, \n", + " region = REGION,\n", + " cluster_name = CLUSTER_NAME,\n", + " main_jar_file_uri = '',\n", + " main_class = MAIN_CLASS,\n", + " args = json.dumps([\n", + " INTPUT_GCS_PATH,\n", + " OUTPUT_GCS_PATH\n", + " ]), \n", + " hadoop_job='', \n", + " job='{}', \n", + " wait_interval='30'\n", + "):\n", + " dataproc_submit_hadoop_job_op(project_id, region, cluster_name, main_jar_file_uri, main_class,\n", + " args, hadoop_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compile the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": 66, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = dataproc_submit_hadoop_job_pipeline\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "import kfp.compiler as compiler\n", + "compiler.Compiler().compile(pipeline_func, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline for execution" + ] + }, + { + "cell_type": "code", + "execution_count": 67, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Experiment link here" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Run link here" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "#Specify pipeline argument values\n", + "arguments = {}\n", + "\n", + "#Get or create an experiment and submit a pipeline run\n", + "import kfp\n", + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)\n", + "\n", + "#Submit a pipeline run\n", + "run_name = pipeline_func.__name__ + ' run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Inspect the outputs\n", + "\n", + "The sample in the notebook will count the words in the input text and output them in sharded files. Here is the command to inspect them:" + ] + }, + { + "cell_type": "code", + "execution_count": 70, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "AccessDeniedException: 403 \r\n" + ] + } + ], + "source": [ + "!gsutil cat $OUTPUT_GCS_PATH/*" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/components/gcp/dataproc/submit_hive_job/README.md b/components/gcp/dataproc/submit_hive_job/README.md new file mode 100644 index 00000000000..2f5057f3262 --- /dev/null +++ b/components/gcp/dataproc/submit_hive_job/README.md @@ -0,0 +1,137 @@ + +# Dataproc - Submit Hive Job + +## Intended Use +A Kubeflow Pipeline component to submit a Hive job on YARN in Google Cloud Dataproc service. + +## Run-Time Parameters: +Name | Description +:--- | :---------- +project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to. +region | Required. The Cloud Dataproc region in which to handle the request. +cluster_name | Required. The cluster to run the job. +queries | Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. +query_file_uri | The HCFS URI of the script that contains Hive queries. +script_variables | Optional. Mapping of query variable names to values (equivalent to the Hive command: SET name="value";). +hive_job | Optional. The full payload of a [HiveJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/HiveJob) +job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs). +wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s. + +## Output: +Name | Description +:--- | :---------- +job_id | The ID of the created job. + +## Sample + +Note: the sample code below works in both IPython notebook or python code directly. + +### Setup a Dataproc cluster +Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one. + +### Prepare Hive query +Directly put your Hive queries in the `queries` list or upload your Hive queries into a file to a Google Cloud Storage (GCS) bucket and place the path in `query_file_uri`. In this sample, we will use a hard coded query in the `queries` list to select data from a public CSV file from GCS. + +For more details, please checkout [Hive language manual](https://cwiki.apache.org/confluence/display/Hive/LanguageManual) + +### Set sample parameters + + +```python +PROJECT_ID = '' +CLUSTER_NAME = '' +REGION = 'us-central1' +QUERY = ''' +DROP TABLE IF EXISTS natality_csv; +CREATE EXTERNAL TABLE natality_csv ( + source_year BIGINT, year BIGINT, month BIGINT, day BIGINT, wday BIGINT, + state STRING, is_male BOOLEAN, child_race BIGINT, weight_pounds FLOAT, + plurality BIGINT, apgar_1min BIGINT, apgar_5min BIGINT, + mother_residence_state STRING, mother_race BIGINT, mother_age BIGINT, + gestation_weeks BIGINT, lmp STRING, mother_married BOOLEAN, + mother_birth_state STRING, cigarette_use BOOLEAN, cigarettes_per_day BIGINT, + alcohol_use BOOLEAN, drinks_per_week BIGINT, weight_gain_pounds BIGINT, + born_alive_alive BIGINT, born_alive_dead BIGINT, born_dead BIGINT, + ever_born BIGINT, father_race BIGINT, father_age BIGINT, + record_weight BIGINT +) +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' +LOCATION 'gs://public-datasets/natality/csv'; + +SELECT * FROM natality_csv LIMIT 10;''' +EXPERIMENT_NAME = 'Dataproc - Submit SparkSQL Job' +COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/dataproc/submit_hive_job/component.yaml' +``` + +### Install KFP SDK +Install the SDK (Uncomment the code if the SDK is not installed before) + + +```python +# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz' +# !pip3 install $KFP_PACKAGE --upgrade +``` + +### Load component definitions + + +```python +import kfp.components as comp + +dataproc_submit_hive_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI) +display(dataproc_submit_hive_job_op) +``` + +### Here is an illustrative pipeline that uses the component + + +```python +import kfp.dsl as dsl +import kfp.gcp as gcp +import json +@dsl.pipeline( + name='Dataproc submit Hive job pipeline', + description='Dataproc submit Hive job pipeline' +) +def dataproc_submit_hive_job_pipeline( + project_id = PROJECT_ID, + region = REGION, + cluster_name = CLUSTER_NAME, + queries = json.dumps([QUERY]), + query_file_uri = '', + script_variables = '', + sparksql_job='', + job='', + wait_interval='30' +): + dataproc_submit_hive_job_op(project_id, region, cluster_name, queries, query_file_uri, + script_variables, sparksql_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa')) + +``` + +### Compile the pipeline + + +```python +pipeline_func = dataproc_submit_hive_job_pipeline +pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz' +import kfp.compiler as compiler +compiler.Compiler().compile(pipeline_func, pipeline_filename) +``` + +### Submit the pipeline for execution + + +```python +#Specify pipeline argument values +arguments = {} + +#Get or create an experiment and submit a pipeline run +import kfp +client = kfp.Client() +experiment = client.create_experiment(EXPERIMENT_NAME) + +#Submit a pipeline run +run_name = pipeline_func.__name__ + ' run' +run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) +``` diff --git a/components/gcp/dataproc/submit_hive_job/sample.ipynb b/components/gcp/dataproc/submit_hive_job/sample.ipynb new file mode 100644 index 00000000000..e8de580066d --- /dev/null +++ b/components/gcp/dataproc/submit_hive_job/sample.ipynb @@ -0,0 +1,232 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataproc - Submit Hive Job\n", + "\n", + "## Intended Use\n", + "A Kubeflow Pipeline component to submit a Hive job on YARN in Google Cloud Dataproc service. \n", + "\n", + "## Run-Time Parameters:\n", + "Name | Description\n", + ":--- | :----------\n", + "project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n", + "region | Required. The Cloud Dataproc region in which to handle the request.\n", + "cluster_name | Required. The cluster to run the job.\n", + "queries | Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. \n", + "query_file_uri | The HCFS URI of the script that contains Hive queries.\n", + "script_variables | Optional. Mapping of query variable names to values (equivalent to the Hive command: SET name=\"value\";).\n", + "hive_job | Optional. The full payload of a [HiveJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/HiveJob)\n", + "job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).\n", + "wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s.\n", + "\n", + "## Output:\n", + "Name | Description\n", + ":--- | :----------\n", + "job_id | The ID of the created job." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample\n", + "\n", + "Note: the sample code below works in both IPython notebook or python code directly.\n", + "\n", + "### Setup a Dataproc cluster\n", + "Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one.\n", + "\n", + "### Prepare Hive query\n", + "Directly put your Hive queries in the `queries` list or upload your Hive queries into a file to a Google Cloud Storage (GCS) bucket and place the path in `query_file_uri`. In this sample, we will use a hard coded query in the `queries` list to select data from a public CSV file from GCS.\n", + "\n", + "For more details, please checkout [Hive language manual](https://cwiki.apache.org/confluence/display/Hive/LanguageManual)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set sample parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "PROJECT_ID = ''\n", + "CLUSTER_NAME = ''\n", + "REGION = 'us-central1'\n", + "QUERY = '''\n", + "DROP TABLE IF EXISTS natality_csv;\n", + "CREATE EXTERNAL TABLE natality_csv (\n", + " source_year BIGINT, year BIGINT, month BIGINT, day BIGINT, wday BIGINT,\n", + " state STRING, is_male BOOLEAN, child_race BIGINT, weight_pounds FLOAT,\n", + " plurality BIGINT, apgar_1min BIGINT, apgar_5min BIGINT,\n", + " mother_residence_state STRING, mother_race BIGINT, mother_age BIGINT,\n", + " gestation_weeks BIGINT, lmp STRING, mother_married BOOLEAN,\n", + " mother_birth_state STRING, cigarette_use BOOLEAN, cigarettes_per_day BIGINT,\n", + " alcohol_use BOOLEAN, drinks_per_week BIGINT, weight_gain_pounds BIGINT,\n", + " born_alive_alive BIGINT, born_alive_dead BIGINT, born_dead BIGINT,\n", + " ever_born BIGINT, father_race BIGINT, father_age BIGINT,\n", + " record_weight BIGINT\n", + ")\n", + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n", + "LOCATION 'gs://public-datasets/natality/csv';\n", + "\n", + "SELECT * FROM natality_csv LIMIT 10;'''\n", + "EXPERIMENT_NAME = 'Dataproc - Submit SparkSQL Job'\n", + "COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/dataproc/submit_hive_job/component.yaml'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install KFP SDK\n", + "Install the SDK (Uncomment the code if the SDK is not installed before)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n", + "# !pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load component definitions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.components as comp\n", + "\n", + "dataproc_submit_hive_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n", + "display(dataproc_submit_hive_job_op)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Here is an illustrative pipeline that uses the component" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.dsl as dsl\n", + "import kfp.gcp as gcp\n", + "import json\n", + "@dsl.pipeline(\n", + " name='Dataproc submit Hive job pipeline',\n", + " description='Dataproc submit Hive job pipeline'\n", + ")\n", + "def dataproc_submit_hive_job_pipeline(\n", + " project_id = PROJECT_ID, \n", + " region = REGION,\n", + " cluster_name = CLUSTER_NAME,\n", + " queries = json.dumps([QUERY]),\n", + " query_file_uri = '',\n", + " script_variables = '', \n", + " sparksql_job='', \n", + " job='', \n", + " wait_interval='30'\n", + "):\n", + " dataproc_submit_hive_job_op(project_id, region, cluster_name, queries, query_file_uri,\n", + " script_variables, sparksql_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compile the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = dataproc_submit_hive_job_pipeline\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "import kfp.compiler as compiler\n", + "compiler.Compiler().compile(pipeline_func, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline for execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Specify pipeline argument values\n", + "arguments = {}\n", + "\n", + "#Get or create an experiment and submit a pipeline run\n", + "import kfp\n", + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)\n", + "\n", + "#Submit a pipeline run\n", + "run_name = pipeline_func.__name__ + ' run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/components/gcp/dataproc/submit_pig_job/README.md b/components/gcp/dataproc/submit_pig_job/README.md new file mode 100644 index 00000000000..b52b82d26ca --- /dev/null +++ b/components/gcp/dataproc/submit_pig_job/README.md @@ -0,0 +1,129 @@ + +# Dataproc - Submit Pig Job + +## Intended Use +A Kubeflow Pipeline component to submit a Pig job on YARN in Google Cloud Dataproc service. + +## Run-Time Parameters: +Name | Description +:--- | :---------- +project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to. +region | Required. The Cloud Dataproc region in which to handle the request. +cluster_name | Required. The cluster to run the job. +queries | Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. +query_file_uri | The HCFS URI of the script that contains Pig queries. +script_variables | Optional. Mapping of query variable names to values (equivalent to the Pig command: SET name="value";). +pig_job | Optional. The full payload of a [PigJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/PigJob) +job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs). +wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s. + +## Output: +Name | Description +:--- | :---------- +job_id | The ID of the created job. + +## Sample + +Note: the sample code below works in both IPython notebook or python code directly. + +### Setup a Dataproc cluster +Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one. + +### Prepare Pig query +Directly put your Pig queries in the `queries` list or upload your Pig queries into a file to a Google Cloud Storage (GCS) bucket and place the path in `query_file_uri`. In this sample, we will use a hard coded query in the `queries` list to select data from a local `passwd` file. + +For more details, please checkout [Pig documentation](http://pig.apache.org/docs/latest/) + +### Set sample parameters + + +```python +PROJECT_ID = '' +CLUSTER_NAME = '' + +REGION = 'us-central1' +QUERY = ''' +natality_csv = load 'gs://public-datasets/natality/csv' using PigStorage(':'); +top_natality_csv = LIMIT natality_csv 10; +dump natality_csv;''' +EXPERIMENT_NAME = 'Dataproc - Submit Pig Job' +COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/dataproc/submit_pig_job/component.yaml' +``` + +### Install KFP SDK +Install the SDK (Uncomment the code if the SDK is not installed before) + + +```python +# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz' +# !pip3 install $KFP_PACKAGE --upgrade +``` + +### Load component definitions + + +```python +import kfp.components as comp + +dataproc_submit_pig_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI) +display(dataproc_submit_pig_job_op) +``` + +### Here is an illustrative pipeline that uses the component + + +```python +import kfp.dsl as dsl +import kfp.gcp as gcp +import json +@dsl.pipeline( + name='Dataproc submit Pig job pipeline', + description='Dataproc submit Pig job pipeline' +) +def dataproc_submit_pig_job_pipeline( + project_id = PROJECT_ID, + region = REGION, + cluster_name = CLUSTER_NAME, + queries = json.dumps([QUERY]), + query_file_uri = '', + script_variables = '', + pig_job='', + job='', + wait_interval='30' +): + dataproc_submit_pig_job_op(project_id, region, cluster_name, queries, query_file_uri, + script_variables, pig_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa')) + +``` + +### Compile the pipeline + + +```python +pipeline_func = dataproc_submit_pig_job_pipeline +pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz' +import kfp.compiler as compiler +compiler.Compiler().compile(pipeline_func, pipeline_filename) +``` + +### Submit the pipeline for execution + + +```python +#Specify pipeline argument values +arguments = {} + +#Get or create an experiment and submit a pipeline run +import kfp +client = kfp.Client() +experiment = client.create_experiment(EXPERIMENT_NAME) + +#Submit a pipeline run +run_name = pipeline_func.__name__ + ' run' +run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) +``` + + +```python + +``` diff --git a/components/gcp/dataproc/submit_pig_job/sample.ipynb b/components/gcp/dataproc/submit_pig_job/sample.ipynb new file mode 100644 index 00000000000..01375e691dd --- /dev/null +++ b/components/gcp/dataproc/submit_pig_job/sample.ipynb @@ -0,0 +1,226 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataproc - Submit Pig Job\n", + "\n", + "## Intended Use\n", + "A Kubeflow Pipeline component to submit a Pig job on YARN in Google Cloud Dataproc service. \n", + "\n", + "## Run-Time Parameters:\n", + "Name | Description\n", + ":--- | :----------\n", + "project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n", + "region | Required. The Cloud Dataproc region in which to handle the request.\n", + "cluster_name | Required. The cluster to run the job.\n", + "queries | Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. \n", + "query_file_uri | The HCFS URI of the script that contains Pig queries.\n", + "script_variables | Optional. Mapping of query variable names to values (equivalent to the Pig command: SET name=\"value\";).\n", + "pig_job | Optional. The full payload of a [PigJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/PigJob)\n", + "job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).\n", + "wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s.\n", + "\n", + "## Output:\n", + "Name | Description\n", + ":--- | :----------\n", + "job_id | The ID of the created job." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample\n", + "\n", + "Note: the sample code below works in both IPython notebook or python code directly.\n", + "\n", + "### Setup a Dataproc cluster\n", + "Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one.\n", + "\n", + "### Prepare Pig query\n", + "Directly put your Pig queries in the `queries` list or upload your Pig queries into a file to a Google Cloud Storage (GCS) bucket and place the path in `query_file_uri`. In this sample, we will use a hard coded query in the `queries` list to select data from a local `passwd` file.\n", + "\n", + "For more details, please checkout [Pig documentation](http://pig.apache.org/docs/latest/)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set sample parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "PROJECT_ID = ''\n", + "CLUSTER_NAME = ''\n", + "\n", + "REGION = 'us-central1'\n", + "QUERY = '''\n", + "natality_csv = load 'gs://public-datasets/natality/csv' using PigStorage(':');\n", + "top_natality_csv = LIMIT natality_csv 10; \n", + "dump natality_csv;'''\n", + "EXPERIMENT_NAME = 'Dataproc - Submit Pig Job'\n", + "COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/dataproc/submit_pig_job/component.yaml'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install KFP SDK\n", + "Install the SDK (Uncomment the code if the SDK is not installed before)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n", + "# !pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load component definitions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.components as comp\n", + "\n", + "dataproc_submit_pig_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n", + "display(dataproc_submit_pig_job_op)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Here is an illustrative pipeline that uses the component" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.dsl as dsl\n", + "import kfp.gcp as gcp\n", + "import json\n", + "@dsl.pipeline(\n", + " name='Dataproc submit Pig job pipeline',\n", + " description='Dataproc submit Pig job pipeline'\n", + ")\n", + "def dataproc_submit_pig_job_pipeline(\n", + " project_id = PROJECT_ID, \n", + " region = REGION,\n", + " cluster_name = CLUSTER_NAME,\n", + " queries = json.dumps([QUERY]),\n", + " query_file_uri = '',\n", + " script_variables = '', \n", + " pig_job='', \n", + " job='', \n", + " wait_interval='30'\n", + "):\n", + " dataproc_submit_pig_job_op(project_id, region, cluster_name, queries, query_file_uri,\n", + " script_variables, pig_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compile the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = dataproc_submit_pig_job_pipeline\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "import kfp.compiler as compiler\n", + "compiler.Compiler().compile(pipeline_func, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline for execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Specify pipeline argument values\n", + "arguments = {}\n", + "\n", + "#Get or create an experiment and submit a pipeline run\n", + "import kfp\n", + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)\n", + "\n", + "#Submit a pipeline run\n", + "run_name = pipeline_func.__name__ + ' run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/components/gcp/dataproc/submit_pyspark_job/README.md b/components/gcp/dataproc/submit_pyspark_job/README.md new file mode 100644 index 00000000000..926a6adc090 --- /dev/null +++ b/components/gcp/dataproc/submit_pyspark_job/README.md @@ -0,0 +1,122 @@ + +# Dataproc - Submit PySpark Job + +## Intended Use +A Kubeflow Pipeline component to submit a PySpark job to Google Cloud Dataproc service. + +## Run-Time Parameters: +Name | Description +:--- | :---------- +project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to. +region | Required. The Cloud Dataproc region in which to handle the request. +cluster_name | Required. The cluster to run the job. +main_python_file_uri | Required. The HCFS URI of the main Python file to use as the driver. Must be a .py file. +args | Optional. The arguments to pass to the driver. Do not include arguments, such as --conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission. +pyspark_job | Optional. The full payload of a [PySparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/PySparkJob). +job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs). +wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s. + +## Output: +Name | Description +:--- | :---------- +job_id | The ID of the created job. + +## Sample + +Note: the sample code below works in both IPython notebook or python code directly. + +### Setup a Dataproc cluster +Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one. + +### Prepare PySpark job +Upload your PySpark code file to a Google Cloud Storage (GCS) bucket. For example, here is a public accessible hello-world.py in GCS: + + +```python +!gsutil cat gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py +``` + +### Set sample parameters + + +```python +PROJECT_ID = '' +CLUSTER_NAME = '' +REGION = 'us-central1' +PYSPARK_FILE_URI = 'gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py' +ARGS = '' +EXPERIMENT_NAME = 'Dataproc - Submit PySpark Job' +SUBMIT_PYSPARK_JOB_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/e5b0081cdcbef6a056c0da114d2eb81ab8d8152d/components/gcp/dataproc/submit_pyspark_job/component.yaml' +``` + +### Install KFP SDK +Install the SDK (Uncomment the code if the SDK is not installed before) + + +```python +# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz' +# !pip3 install $KFP_PACKAGE --upgrade +``` + +### Load component definitions + + +```python +import kfp.components as comp + +dataproc_submit_pyspark_job_op = comp.load_component_from_url(SUBMIT_PYSPARK_JOB_SPEC_URI) +display(dataproc_submit_pyspark_job_op) +``` + +### Here is an illustrative pipeline that uses the component + + +```python +import kfp.dsl as dsl +import kfp.gcp as gcp +import json +@dsl.pipeline( + name='Dataproc submit PySpark job pipeline', + description='Dataproc submit PySpark job pipeline' +) +def dataproc_submit_pyspark_job_pipeline( + project_id = PROJECT_ID, + region = REGION, + cluster_name = CLUSTER_NAME, + main_python_file_uri = PYSPARK_FILE_URI, + args = ARGS, + pyspark_job='{}', + job='{}', + wait_interval='30' +): + dataproc_submit_pyspark_job_op(project_id, region, cluster_name, main_python_file_uri, + args, pyspark_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa')) + +``` + +### Compile the pipeline + + +```python +pipeline_func = dataproc_submit_pyspark_job_pipeline +pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz' +import kfp.compiler as compiler +compiler.Compiler().compile(pipeline_func, pipeline_filename) +``` + +### Submit the pipeline for execution + + +```python +#Specify pipeline argument values +arguments = {} + +#Get or create an experiment and submit a pipeline run +import kfp +client = kfp.Client() +experiment = client.create_experiment(EXPERIMENT_NAME) + +#Submit a pipeline run +run_name = pipeline_func.__name__ + ' run' +run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) +``` diff --git a/components/gcp/dataproc/submit_pyspark_job/sample.ipynb b/components/gcp/dataproc/submit_pyspark_job/sample.ipynb new file mode 100644 index 00000000000..e73c9d6e58c --- /dev/null +++ b/components/gcp/dataproc/submit_pyspark_job/sample.ipynb @@ -0,0 +1,221 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataproc - Submit PySpark Job\n", + "\n", + "## Intended Use\n", + "A Kubeflow Pipeline component to submit a PySpark job to Google Cloud Dataproc service. \n", + "\n", + "## Run-Time Parameters:\n", + "Name | Description\n", + ":--- | :----------\n", + "project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n", + "region | Required. The Cloud Dataproc region in which to handle the request.\n", + "cluster_name | Required. The cluster to run the job.\n", + "main_python_file_uri | Required. The HCFS URI of the main Python file to use as the driver. Must be a .py file.\n", + "args | Optional. The arguments to pass to the driver. Do not include arguments, such as --conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission.\n", + "pyspark_job | Optional. The full payload of a [PySparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/PySparkJob).\n", + "job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).\n", + "wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s.\n", + "\n", + "## Output:\n", + "Name | Description\n", + ":--- | :----------\n", + "job_id | The ID of the created job." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample\n", + "\n", + "Note: the sample code below works in both IPython notebook or python code directly.\n", + "\n", + "### Setup a Dataproc cluster\n", + "Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one.\n", + "\n", + "### Prepare PySpark job\n", + "Upload your PySpark code file to a Google Cloud Storage (GCS) bucket. For example, here is a public accessible hello-world.py in GCS:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!gsutil cat gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set sample parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "PROJECT_ID = ''\n", + "CLUSTER_NAME = ''\n", + "REGION = 'us-central1'\n", + "PYSPARK_FILE_URI = 'gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py'\n", + "ARGS = ''\n", + "EXPERIMENT_NAME = 'Dataproc - Submit PySpark Job'\n", + "SUBMIT_PYSPARK_JOB_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/e5b0081cdcbef6a056c0da114d2eb81ab8d8152d/components/gcp/dataproc/submit_pyspark_job/component.yaml'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install KFP SDK\n", + "Install the SDK (Uncomment the code if the SDK is not installed before)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n", + "# !pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load component definitions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.components as comp\n", + "\n", + "dataproc_submit_pyspark_job_op = comp.load_component_from_url(SUBMIT_PYSPARK_JOB_SPEC_URI)\n", + "display(dataproc_submit_pyspark_job_op)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Here is an illustrative pipeline that uses the component" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.dsl as dsl\n", + "import kfp.gcp as gcp\n", + "import json\n", + "@dsl.pipeline(\n", + " name='Dataproc submit PySpark job pipeline',\n", + " description='Dataproc submit PySpark job pipeline'\n", + ")\n", + "def dataproc_submit_pyspark_job_pipeline(\n", + " project_id = PROJECT_ID, \n", + " region = REGION,\n", + " cluster_name = CLUSTER_NAME,\n", + " main_python_file_uri = PYSPARK_FILE_URI, \n", + " args = ARGS, \n", + " pyspark_job='{}', \n", + " job='{}', \n", + " wait_interval='30'\n", + "):\n", + " dataproc_submit_pyspark_job_op(project_id, region, cluster_name, main_python_file_uri, \n", + " args, pyspark_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compile the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = dataproc_submit_pyspark_job_pipeline\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "import kfp.compiler as compiler\n", + "compiler.Compiler().compile(pipeline_func, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline for execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Specify pipeline argument values\n", + "arguments = {}\n", + "\n", + "#Get or create an experiment and submit a pipeline run\n", + "import kfp\n", + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)\n", + "\n", + "#Submit a pipeline run\n", + "run_name = pipeline_func.__name__ + ' run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/components/gcp/dataproc/submit_spark_job/README.md b/components/gcp/dataproc/submit_spark_job/README.md new file mode 100644 index 00000000000..eab66e924d0 --- /dev/null +++ b/components/gcp/dataproc/submit_spark_job/README.md @@ -0,0 +1,124 @@ + +# Dataproc - Submit Spark Job + +## Intended Use +A Kubeflow Pipeline component to submit a Spark job on YARN in Google Cloud Dataproc service. + +## Run-Time Parameters: +Name | Description +:--- | :---------- +project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to. +region | Required. The Cloud Dataproc region in which to handle the request. +cluster_name | Required. The cluster to run the job. +main_jar_file_uri | The HCFS URI of the jar file that contains the main class. +main_class | The name of the driver's main class. The jar file that contains the class must be in the default CLASSPATH or specified in jarFileUris. +args | Optional. The arguments to pass to the driver. Do not include arguments, such as --conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission. +spark_job | Optional. The full payload of a [SparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/SparkJob). +job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs). +wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s. + +## Output: +Name | Description +:--- | :---------- +job_id | The ID of the created job. + +## Sample + +Note: the sample code below works in both IPython notebook or python code directly. + +### Setup a Dataproc cluster +Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one. + +### Prepare Spark job +Upload your Spark jar file to a Google Cloud Storage (GCS) bucket. In the sample, we will use a jar file that is pre-installed in the main cluster `file:///usr/lib/spark/examples/jars/spark-examples.jar`. + +Here is the [source code of example](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java). + +To package a self-contained spark application, follow the [instructions](https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications). + +### Set sample parameters + + +```python +PROJECT_ID = '' +CLUSTER_NAME = '' +REGION = 'us-central1' +SPARK_FILE_URI = 'file:///usr/lib/spark/examples/jars/spark-examples.jar' +MAIN_CLASS = 'org.apache.spark.examples.SparkPi' +ARGS = ['1000'] +EXPERIMENT_NAME = 'Dataproc - Submit Spark Job' +COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/7622e57666c17088c94282ccbe26d6a52768c226/components/gcp/dataproc/submit_spark_job/component.yaml' +``` + +### Install KFP SDK +Install the SDK (Uncomment the code if the SDK is not installed before) + + +```python +# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz' +# !pip3 install $KFP_PACKAGE --upgrade +``` + +### Load component definitions + + +```python +import kfp.components as comp + +dataproc_submit_spark_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI) +display(dataproc_submit_spark_job_op) +``` + +### Here is an illustrative pipeline that uses the component + + +```python +import kfp.dsl as dsl +import kfp.gcp as gcp +import json +@dsl.pipeline( + name='Dataproc submit Spark job pipeline', + description='Dataproc submit Spark job pipeline' +) +def dataproc_submit_spark_job_pipeline( + project_id = PROJECT_ID, + region = REGION, + cluster_name = CLUSTER_NAME, + main_jar_file_uri = '', + main_class = MAIN_CLASS, + args = json.dumps(ARGS), + spark_job=json.dumps({ 'jarFileUris': [ SPARK_FILE_URI ] }), + job='{}', + wait_interval='30' +): + dataproc_submit_spark_job_op(project_id, region, cluster_name, main_jar_file_uri, main_class, + args, spark_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa')) + +``` + +### Compile the pipeline + + +```python +pipeline_func = dataproc_submit_spark_job_pipeline +pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz' +import kfp.compiler as compiler +compiler.Compiler().compile(pipeline_func, pipeline_filename) +``` + +### Submit the pipeline for execution + + +```python +#Specify pipeline argument values +arguments = {} + +#Get or create an experiment and submit a pipeline run +import kfp +client = kfp.Client() +experiment = client.create_experiment(EXPERIMENT_NAME) + +#Submit a pipeline run +run_name = pipeline_func.__name__ + ' run' +run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) +``` diff --git a/components/gcp/dataproc/submit_spark_job/component.yaml b/components/gcp/dataproc/submit_spark_job/component.yaml index 99f775574f4..e02f3d00c4d 100644 --- a/components/gcp/dataproc/submit_spark_job/component.yaml +++ b/components/gcp/dataproc/submit_spark_job/component.yaml @@ -28,9 +28,7 @@ inputs: - name: main_jar_file_uri default: '' description: >- - The name of the driver's main class. The jar file - that contains the class must be in the default CLASSPATH or specified - in jarFileUris. + The HCFS URI of the jar file that contains the main class. - name: main_class default: '' description: >- diff --git a/components/gcp/dataproc/submit_spark_job/sample.ipynb b/components/gcp/dataproc/submit_spark_job/sample.ipynb new file mode 100644 index 00000000000..084b7ef07b4 --- /dev/null +++ b/components/gcp/dataproc/submit_spark_job/sample.ipynb @@ -0,0 +1,219 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataproc - Submit Spark Job\n", + "\n", + "## Intended Use\n", + "A Kubeflow Pipeline component to submit a Spark job on YARN in Google Cloud Dataproc service. \n", + "\n", + "## Run-Time Parameters:\n", + "Name | Description\n", + ":--- | :----------\n", + "project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n", + "region | Required. The Cloud Dataproc region in which to handle the request.\n", + "cluster_name | Required. The cluster to run the job.\n", + "main_jar_file_uri | The HCFS URI of the jar file that contains the main class.\n", + "main_class | The name of the driver's main class. The jar file that contains the class must be in the default CLASSPATH or specified in jarFileUris. \n", + "args | Optional. The arguments to pass to the driver. Do not include arguments, such as --conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission.\n", + "spark_job | Optional. The full payload of a [SparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/SparkJob).\n", + "job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).\n", + "wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s.\n", + "\n", + "## Output:\n", + "Name | Description\n", + ":--- | :----------\n", + "job_id | The ID of the created job." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample\n", + "\n", + "Note: the sample code below works in both IPython notebook or python code directly.\n", + "\n", + "### Setup a Dataproc cluster\n", + "Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one.\n", + "\n", + "### Prepare Spark job\n", + "Upload your Spark jar file to a Google Cloud Storage (GCS) bucket. In the sample, we will use a jar file that is pre-installed in the main cluster `file:///usr/lib/spark/examples/jars/spark-examples.jar`. \n", + "\n", + "Here is the [source code of example](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java).\n", + "\n", + "To package a self-contained spark application, follow the [instructions](https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set sample parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "PROJECT_ID = ''\n", + "CLUSTER_NAME = ''\n", + "REGION = 'us-central1'\n", + "SPARK_FILE_URI = 'file:///usr/lib/spark/examples/jars/spark-examples.jar'\n", + "MAIN_CLASS = 'org.apache.spark.examples.SparkPi'\n", + "ARGS = ['1000']\n", + "EXPERIMENT_NAME = 'Dataproc - Submit Spark Job'\n", + "COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/7622e57666c17088c94282ccbe26d6a52768c226/components/gcp/dataproc/submit_spark_job/component.yaml'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install KFP SDK\n", + "Install the SDK (Uncomment the code if the SDK is not installed before)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n", + "# !pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load component definitions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.components as comp\n", + "\n", + "dataproc_submit_spark_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n", + "display(dataproc_submit_spark_job_op)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Here is an illustrative pipeline that uses the component" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.dsl as dsl\n", + "import kfp.gcp as gcp\n", + "import json\n", + "@dsl.pipeline(\n", + " name='Dataproc submit Spark job pipeline',\n", + " description='Dataproc submit Spark job pipeline'\n", + ")\n", + "def dataproc_submit_spark_job_pipeline(\n", + " project_id = PROJECT_ID, \n", + " region = REGION,\n", + " cluster_name = CLUSTER_NAME,\n", + " main_jar_file_uri = '',\n", + " main_class = MAIN_CLASS,\n", + " args = json.dumps(ARGS), \n", + " spark_job=json.dumps({ 'jarFileUris': [ SPARK_FILE_URI ] }), \n", + " job='{}', \n", + " wait_interval='30'\n", + "):\n", + " dataproc_submit_spark_job_op(project_id, region, cluster_name, main_jar_file_uri, main_class,\n", + " args, spark_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compile the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = dataproc_submit_spark_job_pipeline\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "import kfp.compiler as compiler\n", + "compiler.Compiler().compile(pipeline_func, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline for execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Specify pipeline argument values\n", + "arguments = {}\n", + "\n", + "#Get or create an experiment and submit a pipeline run\n", + "import kfp\n", + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)\n", + "\n", + "#Submit a pipeline run\n", + "run_name = pipeline_func.__name__ + ' run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/components/gcp/dataproc/submit_sparksql_job/README.md b/components/gcp/dataproc/submit_sparksql_job/README.md new file mode 100644 index 00000000000..891e6f5d45f --- /dev/null +++ b/components/gcp/dataproc/submit_sparksql_job/README.md @@ -0,0 +1,142 @@ + +# Dataproc - Submit SparkSQL Job + +## Intended Use +A Kubeflow Pipeline component to submit a SparkSql job on YARN in Google Cloud Dataproc service. + +## Run-Time Parameters: +Name | Description +:--- | :---------- +project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to. +region | Required. The Cloud Dataproc region in which to handle the request. +cluster_name | Required. The cluster to run the job. +queries | Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. +query_file_uri | The HCFS URI of the script that contains SQL queries. +script_variables | Optional. Mapping of query variable names to values (equivalent to the Spark SQL command: SET name="value";). +sparksql_job | Optional. The full payload of a [SparkSqlJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/SparkSqlJob) +job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs). +wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s. + +## Output: +Name | Description +:--- | :---------- +job_id | The ID of the created job. + +## Sample + +Note: the sample code below works in both IPython notebook or python code directly. + +### Setup a Dataproc cluster +Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one.Directly put your Hive queries in the `queries` list or upload your Hive queries into a file to a Google Cloud Storage (GCS) bucket and place the path in `query_file_uri`. In this sample, we will use a hard coded query in the `queries` list to select data from a public CSV file from GCS. + +### Prepare SparkSQL job +Directly put your SparkSQL queries in the `queires` list or upload your SparkSQL queries into a file to a Google Cloud Storage (GCS) bucket and place the path in `query_file_uri`. In this sample, we will use a hard coded query in the `queries` list to select data from a public CSV file from GCS. + +For more details about Spark SQL, please checkout the [programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) + +### Set sample parameters + + +```python +PROJECT_ID = '' +CLUSTER_NAME = '' +REGION = 'us-central1' +QUERY = ''' +DROP TABLE IF EXISTS natality_csv; +CREATE EXTERNAL TABLE natality_csv ( + source_year BIGINT, year BIGINT, month BIGINT, day BIGINT, wday BIGINT, + state STRING, is_male BOOLEAN, child_race BIGINT, weight_pounds FLOAT, + plurality BIGINT, apgar_1min BIGINT, apgar_5min BIGINT, + mother_residence_state STRING, mother_race BIGINT, mother_age BIGINT, + gestation_weeks BIGINT, lmp STRING, mother_married BOOLEAN, + mother_birth_state STRING, cigarette_use BOOLEAN, cigarettes_per_day BIGINT, + alcohol_use BOOLEAN, drinks_per_week BIGINT, weight_gain_pounds BIGINT, + born_alive_alive BIGINT, born_alive_dead BIGINT, born_dead BIGINT, + ever_born BIGINT, father_race BIGINT, father_age BIGINT, + record_weight BIGINT +) +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' +LOCATION 'gs://public-datasets/natality/csv'; + +SELECT * FROM natality_csv LIMIT 10;''' +EXPERIMENT_NAME = 'Dataproc - Submit SparkSQL Job' +COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/dataproc/submit_sparksql_job/component.yaml' +``` + +### Install KFP SDK +Install the SDK (Uncomment the code if the SDK is not installed before) + + +```python +# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz' +# !pip3 install $KFP_PACKAGE --upgrade +``` + +### Load component definitions + + +```python +import kfp.components as comp + +dataproc_submit_sparksql_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI) +display(dataproc_submit_sparksql_job_op) +``` + +### Here is an illustrative pipeline that uses the component + + +```python +import kfp.dsl as dsl +import kfp.gcp as gcp +import json +@dsl.pipeline( + name='Dataproc submit SparkSQL job pipeline', + description='Dataproc submit SparkSQL job pipeline' +) +def dataproc_submit_sparksql_job_pipeline( + project_id = PROJECT_ID, + region = REGION, + cluster_name = CLUSTER_NAME, + queries = json.dumps([QUERY]), + query_file_uri = '', + script_variables = '', + sparksql_job='', + job='', + wait_interval='30' +): + dataproc_submit_sparksql_job_op(project_id, region, cluster_name, queries, query_file_uri, + script_variables, sparksql_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa')) + +``` + +### Compile the pipeline + + +```python +pipeline_func = dataproc_submit_sparksql_job_pipeline +pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz' +import kfp.compiler as compiler +compiler.Compiler().compile(pipeline_func, pipeline_filename) +``` + +### Submit the pipeline for execution + + +```python +#Specify pipeline argument values +arguments = {} + +#Get or create an experiment and submit a pipeline run +import kfp +client = kfp.Client() +experiment = client.create_experiment(EXPERIMENT_NAME) + +#Submit a pipeline run +run_name = pipeline_func.__name__ + ' run' +run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) +``` + + +```python + +``` diff --git a/components/gcp/dataproc/submit_sparksql_job/sample.ipynb b/components/gcp/dataproc/submit_sparksql_job/sample.ipynb new file mode 100644 index 00000000000..c11444086e3 --- /dev/null +++ b/components/gcp/dataproc/submit_sparksql_job/sample.ipynb @@ -0,0 +1,239 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataproc - Submit SparkSQL Job\n", + "\n", + "## Intended Use\n", + "A Kubeflow Pipeline component to submit a SparkSql job on YARN in Google Cloud Dataproc service. \n", + "\n", + "## Run-Time Parameters:\n", + "Name | Description\n", + ":--- | :----------\n", + "project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n", + "region | Required. The Cloud Dataproc region in which to handle the request.\n", + "cluster_name | Required. The cluster to run the job.\n", + "queries | Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. \n", + "query_file_uri | The HCFS URI of the script that contains SQL queries.\n", + "script_variables | Optional. Mapping of query variable names to values (equivalent to the Spark SQL command: SET name=\"value\";).\n", + "sparksql_job | Optional. The full payload of a [SparkSqlJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/SparkSqlJob)\n", + "job | Optional. The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).\n", + "wait_interval | Optional. The wait seconds between polling the operation. Defaults to 30s.\n", + "\n", + "## Output:\n", + "Name | Description\n", + ":--- | :----------\n", + "job_id | The ID of the created job." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sample\n", + "\n", + "Note: the sample code below works in both IPython notebook or python code directly.\n", + "\n", + "### Setup a Dataproc cluster\n", + "Follow the [guide](https://cloud.google.com/dataproc/docs/guides/create-cluster) to create a new Dataproc cluster or reuse an existing one.Directly put your Hive queries in the `queries` list or upload your Hive queries into a file to a Google Cloud Storage (GCS) bucket and place the path in `query_file_uri`. In this sample, we will use a hard coded query in the `queries` list to select data from a public CSV file from GCS.\n", + "\n", + "### Prepare SparkSQL job\n", + "Directly put your SparkSQL queries in the `queires` list or upload your SparkSQL queries into a file to a Google Cloud Storage (GCS) bucket and place the path in `query_file_uri`. In this sample, we will use a hard coded query in the `queries` list to select data from a public CSV file from GCS.\n", + "\n", + "For more details about Spark SQL, please checkout the [programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set sample parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "PROJECT_ID = ''\n", + "CLUSTER_NAME = ''\n", + "REGION = 'us-central1'\n", + "QUERY = '''\n", + "DROP TABLE IF EXISTS natality_csv;\n", + "CREATE EXTERNAL TABLE natality_csv (\n", + " source_year BIGINT, year BIGINT, month BIGINT, day BIGINT, wday BIGINT,\n", + " state STRING, is_male BOOLEAN, child_race BIGINT, weight_pounds FLOAT,\n", + " plurality BIGINT, apgar_1min BIGINT, apgar_5min BIGINT,\n", + " mother_residence_state STRING, mother_race BIGINT, mother_age BIGINT,\n", + " gestation_weeks BIGINT, lmp STRING, mother_married BOOLEAN,\n", + " mother_birth_state STRING, cigarette_use BOOLEAN, cigarettes_per_day BIGINT,\n", + " alcohol_use BOOLEAN, drinks_per_week BIGINT, weight_gain_pounds BIGINT,\n", + " born_alive_alive BIGINT, born_alive_dead BIGINT, born_dead BIGINT,\n", + " ever_born BIGINT, father_race BIGINT, father_age BIGINT,\n", + " record_weight BIGINT\n", + ")\n", + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n", + "LOCATION 'gs://public-datasets/natality/csv';\n", + "\n", + "SELECT * FROM natality_csv LIMIT 10;'''\n", + "EXPERIMENT_NAME = 'Dataproc - Submit SparkSQL Job'\n", + "COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/dataproc/submit_sparksql_job/component.yaml'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install KFP SDK\n", + "Install the SDK (Uncomment the code if the SDK is not installed before)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n", + "# !pip3 install $KFP_PACKAGE --upgrade" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load component definitions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.components as comp\n", + "\n", + "dataproc_submit_sparksql_job_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n", + "display(dataproc_submit_sparksql_job_op)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Here is an illustrative pipeline that uses the component" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp.dsl as dsl\n", + "import kfp.gcp as gcp\n", + "import json\n", + "@dsl.pipeline(\n", + " name='Dataproc submit SparkSQL job pipeline',\n", + " description='Dataproc submit SparkSQL job pipeline'\n", + ")\n", + "def dataproc_submit_sparksql_job_pipeline(\n", + " project_id = PROJECT_ID, \n", + " region = REGION,\n", + " cluster_name = CLUSTER_NAME,\n", + " queries = json.dumps([QUERY]),\n", + " query_file_uri = '',\n", + " script_variables = '', \n", + " sparksql_job='', \n", + " job='', \n", + " wait_interval='30'\n", + "):\n", + " dataproc_submit_sparksql_job_op(project_id, region, cluster_name, queries, query_file_uri,\n", + " script_variables, sparksql_job, job, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compile the pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = dataproc_submit_sparksql_job_pipeline\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "import kfp.compiler as compiler\n", + "compiler.Compiler().compile(pipeline_func, pipeline_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline for execution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Specify pipeline argument values\n", + "arguments = {}\n", + "\n", + "#Get or create an experiment and submit a pipeline run\n", + "import kfp\n", + "client = kfp.Client()\n", + "experiment = client.create_experiment(EXPERIMENT_NAME)\n", + "\n", + "#Submit a pipeline run\n", + "run_name = pipeline_func.__name__ + ' run'\n", + "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}