Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sample notebook and readme markdown for GCP components. #899

Merged
merged 9 commits into from
Mar 5, 2019
112 changes: 112 additions & 0 deletions components/gcp/bigquery/query/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@

# Bigquery - Query

## Intended Use
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery service and dump outputs to a Google Cloud Storage blob.

## Run-Time Parameters:
Name | Description
:--- | :----------
query | The query used by Bigquery service to fetch the results.
project_id | The project to execute the query job.
dataset_id | The ID of the persistent dataset to keep the results of the query. If the dataset does not exist, the operation will create a new one.
table_id | The ID of the table to keep the results of the query. If absent, the operation will generate a random id for the table.
output_gcs_path | The GCS blob path to dump the query results to.
dataset_location | The location to create the dataset. Defaults to `US`.
job_config | The full config spec for the query job. See [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) for details.

## Output:
Name | Description
:--- | :----------
output_gcs_path | The GCS blob path to dump the query results to.

## Sample

Note: the sample code below works in both IPython notebook or python code directly.

### Set sample parameters


```python
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash

# Optional Parameters
EXPERIMENT_NAME = 'Bigquery -Query'
COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/bigquery/query/component.yaml'
```

### Install KFP SDK


```python
# Install the SDK (Uncomment the code if the SDK is not installed before)
# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.11/kfp.tar.gz'
# !pip3 install $KFP_PACKAGE --upgrade
```

### Load component definitions


```python
import kfp.components as comp

bigquery_query_op = comp.load_component_from_url(COMPONENT_SPEC_URI)
display(bigquery_query_op)
```

### Here is an illustrative pipeline that uses the component


```python
import kfp.dsl as dsl
import kfp.gcp as gcp
import json
@dsl.pipeline(
name='Bigquery query pipeline',
description='Bigquery query pipeline'
)
def pipeline(
query,
project_id,
dataset_id='',
table_id='',
output_gcs_path='',
dataset_location='US',
job_config=''
):
bigquery_query_op(query, project_id, dataset_id, table_id, output_gcs_path, dataset_location,
job_config).apply(gcp.use_gcp_secret('user-gcp-sa'))
```

### Compile the pipeline


```python
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
```

### Submit the pipeline for execution


```python
#Specify pipeline argument values
arguments = {
'query': 'SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 10',
'project_id': PROJECT_ID,
'output_gcs_path': '{}/bigquery/query/questions.csv'.format(GCS_WORKING_DIR)
}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
```
2 changes: 1 addition & 1 deletion components/gcp/bigquery/query/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ outputs:
- {name: output_gcs_path, description: 'The GCS blob path to dump the query results to.'}
implementation:
container:
image: gcr.io/ml-pipeline-dogfood/ml-pipeline-gcp:latest
image: gcr.io/ml-pipeline/ml-pipeline-gcp:latest
args: [
kfp_component.google.bigquery, query,
--query, {inputValue: query},
Expand Down
207 changes: 207 additions & 0 deletions components/gcp/bigquery/query/sample.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Bigquery - Query\n",
"\n",
"## Intended Use\n",
"A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery service and dump outputs to a Google Cloud Storage blob. \n",
"\n",
"## Input:\n",
"Name | Description\n",
":--- | :----------\n",
"query | The query used by Bigquery service to fetch the results.\n",
"project_id | The project to execute the query job.\n",
"dataset_id | The ID of the persistent dataset to keep the results of the query. If the dataset does not exist, the operation will create a new one.\n",
"table_id | The ID of the table to keep the results of the query. If absent, the operation will generate a random id for the table.\n",
"output_gcs_path | The GCS blob path to dump the query results to.\n",
"dataset_location | The location to create the dataset. Defaults to `US`.\n",
"job_config | The full config spec for the query job. See [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) for details.\n",
"\n",
"## Output:\n",
"Name | Description\n",
":--- | :----------\n",
"output_gcs_path | The GCS blob path to dump the query results to."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sample\n",
"\n",
"Note: the sample code below works in both IPython notebook or python code directly."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set sample parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"# Required Parameters\n",
"PROJECT_ID = '<Please put your project ID here>'\n",
"GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash\n",
"\n",
"# Optional Parameters\n",
"EXPERIMENT_NAME = 'Bigquery -Query'\n",
"COMPONENT_SPEC_URI = 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/bigquery/query/component.yaml'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Install KFP SDK"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# Install the SDK (Uncomment the code if the SDK is not installed before)\n",
"# KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.11/kfp.tar.gz'\n",
"# !pip3 install $KFP_PACKAGE --upgrade"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load component definitions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp.components as comp\n",
"\n",
"bigquery_query_op = comp.load_component_from_url(COMPONENT_SPEC_URI)\n",
"display(bigquery_query_op)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the component as a single pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp.dsl as dsl\n",
"import kfp.gcp as gcp\n",
"import json\n",
"@dsl.pipeline(\n",
" name='Bigquery query pipeline',\n",
" description='Bigquery query pipeline'\n",
")\n",
"def pipeline(\n",
" query, \n",
" project_id, \n",
" dataset_id='', \n",
" table_id='', \n",
" output_gcs_path='', \n",
" dataset_location='US', \n",
" job_config=''\n",
"):\n",
" bigquery_query_op(query, project_id, dataset_id, table_id, output_gcs_path, dataset_location, \n",
" job_config).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Compile the pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_func = pipeline\n",
"pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n",
"import kfp.compiler as compiler\n",
"compiler.Compiler().compile(pipeline_func, pipeline_filename)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit the pipeline for execution"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Specify pipeline argument values\n",
"arguments = {\n",
" 'query': 'SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 10',\n",
" 'project_id': PROJECT_ID,\n",
" 'output_gcs_path': '{}/bigquery/query/questions.csv'.format(GCS_WORKING_DIR)\n",
"}\n",
"\n",
"#Get or create an experiment and submit a pipeline run\n",
"import kfp\n",
"client = kfp.Client()\n",
"experiment = client.create_experiment(EXPERIMENT_NAME)\n",
"\n",
"#Submit a pipeline run\n",
"run_name = pipeline_func.__name__ + ' run'\n",
"run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading