Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation and sample notebook for dataproc components. #971

Merged
merged 1 commit into from
Mar 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def submit_spark_job(project_id, region, cluster_name,
region (str): Required. The Cloud Dataproc region in which to handle the
request.
cluster_name (str): Required. The cluster to run the job.
main_jar_file_uri (str): The name of the driver's main class. The jar file
that contains the class must be in the default CLASSPATH or specified
in jarFileUris.
main_jar_file_uri (str): The HCFS URI of the jar file that contains the main class.
main_class (str): The name of the driver's main class. The jar file that
contains the class must be in the default CLASSPATH or specified in
jarFileUris.
Expand Down
112 changes: 112 additions & 0 deletions components/gcp/dataproc/create_cluster/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@

# Dataproc - Create Cluster

## Intended Use
A Kubeflow Pipeline component to create a cluster in Google Cloud Dataproc service.

## Run-Time Parameters:
Name | Description
:--- | :----------
project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.
region | Required. The Cloud Dataproc region in which to handle the request.
name | Optional. The cluster name. Cluster names within a project must be unique. Names of deleted clusters can be reused.
name_prefix | Optional. The prefix of the cluster name.
initialization_actions | Optional. List of GCS URIs of executables to execute on each node after config is completed. By default, executables are run on master and all worker nodes.
config_bucket | Optional. A Google Cloud Storage bucket used to stage job dependencies, config files, and job driver console output.
image_version | Optional. The version of software inside the cluster.
cluster | Optional. The full [cluster config](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster)
wait_interval | The wait seconds between polling the operation. Defaults to 30s.

## Output:
Name | Description
:--- | :----------
cluster_name | The cluster name of the created cluster.

## Sample

Note: the sample code below works in both IPython notebook or python code directly.

### Set sample parameters


```python
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'

# Optional Parameters
EXPERIMENT_NAME = 'Dataproc - Create Cluster'
COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/e5b0081cdcbef6a056c0da114d2eb81ab8d8152d/components/gcp/dataproc/create_cluster/component.yaml'
```

### Install KFP SDK
Install the SDK (Uncomment the code if the SDK is not installed before)


```python
#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'
#!pip3 install $KFP_PACKAGE --upgrade
```

### Load component definitions


```python
import kfp.components as comp

dataproc_create_cluster_op = comp.load_component_from_url(COMPONENT_SPEC_URI)
display(dataproc_create_cluster_op)
```

### Here is an illustrative pipeline that uses the component


```python
import kfp.dsl as dsl
import kfp.gcp as gcp
import json
@dsl.pipeline(
name='Dataproc create cluster pipeline',
description='Dataproc create cluster pipeline'
)
def dataproc_create_cluster_pipeline(
project_id = PROJECT_ID,
region = 'us-central1',
name='',
name_prefix='',
job_name_prefix='',
initialization_actions='',
config_bucket='',
image_version='',
cluster='',
wait_interval='30'
):
dataproc_create_cluster_op(project_id, region, name, name_prefix, job_name_prefix, initialization_actions,
config_bucket, image_version, cluster, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))
```

### Compile the pipeline


```python
pipeline_func = dataproc_create_cluster_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
```

### Submit the pipeline for execution


```python
#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
```
207 changes: 207 additions & 0 deletions components/gcp/dataproc/create_cluster/sample.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dataproc - Create Cluster\n",
"\n",
"## Intended Use\n",
"A Kubeflow Pipeline component to create a cluster in Google Cloud Dataproc service. \n",
"\n",
"## Run-Time Parameters:\n",
"Name | Description\n",
":--- | :----------\n",
"project_id | Required. The ID of the Google Cloud Platform project that the cluster belongs to.\n",
"region | Required. The Cloud Dataproc region in which to handle the request.\n",
"name | Optional. The cluster name. Cluster names within a project must be unique. Names of deleted clusters can be reused.\n",
"name_prefix | Optional. The prefix of the cluster name.\n",
"initialization_actions | Optional. List of GCS URIs of executables to execute on each node after config is completed. By default, executables are run on master and all worker nodes. \n",
"config_bucket | Optional. A Google Cloud Storage bucket used to stage job dependencies, config files, and job driver console output.\n",
"image_version | Optional. The version of software inside the cluster.\n",
"cluster | Optional. The full [cluster config](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster)\n",
"wait_interval | The wait seconds between polling the operation. Defaults to 30s.\n",
"\n",
"## Output:\n",
"Name | Description\n",
":--- | :----------\n",
"cluster_name | The cluster name of the created cluster."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sample\n",
"\n",
"Note: the sample code below works in both IPython notebook or python code directly."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set sample parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"# Required Parameters\n",
"PROJECT_ID = '<Please put your project ID here>'\n",
"\n",
"# Optional Parameters\n",
"EXPERIMENT_NAME = 'Dataproc - Create Cluster'\n",
"COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/e5b0081cdcbef6a056c0da114d2eb81ab8d8152d/components/gcp/dataproc/create_cluster/component.yaml'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Install KFP SDK\n",
"Install the SDK (Uncomment the code if the SDK is not installed before)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp.tar.gz'\n",
"#!pip3 install $KFP_PACKAGE --upgrade"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load component definitions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp.components as comp\n",
"\n",
"dataproc_create_cluster_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n",
"display(dataproc_create_cluster_op)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Here is an illustrative pipeline that uses the component"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp.dsl as dsl\n",
"import kfp.gcp as gcp\n",
"import json\n",
"@dsl.pipeline(\n",
" name='Dataproc create cluster pipeline',\n",
" description='Dataproc create cluster pipeline'\n",
")\n",
"def dataproc_create_cluster_pipeline(\n",
" project_id = PROJECT_ID, \n",
" region = 'us-central1', \n",
" name='', \n",
" name_prefix='', \n",
" job_name_prefix='', \n",
" initialization_actions='', \n",
" config_bucket='', \n",
" image_version='', \n",
" cluster='', \n",
" wait_interval='30'\n",
"):\n",
" dataproc_create_cluster_op(project_id, region, name, name_prefix, job_name_prefix, initialization_actions, \n",
" config_bucket, image_version, cluster, wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Compile the pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_func = dataproc_create_cluster_pipeline\n",
"pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n",
"import kfp.compiler as compiler\n",
"compiler.Compiler().compile(pipeline_func, pipeline_filename)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit the pipeline for execution"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Specify pipeline argument values\n",
"arguments = {}\n",
"\n",
"#Get or create an experiment and submit a pipeline run\n",
"import kfp\n",
"client = kfp.Client()\n",
"experiment = client.create_experiment(EXPERIMENT_NAME)\n",
"\n",
"#Submit a pipeline run\n",
"run_name = pipeline_func.__name__ + ' run'\n",
"run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading