Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump TFX dependency to 0.21.0rc0 #2949

Merged
merged 5 commits into from
Feb 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ FROM python:3.5 as compiler
RUN apt-get update -y && \
apt-get install --no-install-recommends -y -q default-jdk python3-setuptools python3-dev
RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py
RUN python3 -m pip install apache-beam[gcp]==2.17 pyarrow==0.14.1 tfx==0.15.0
RUN python3 -m pip install tfx==0.21.0rc0

WORKDIR /go/src/github.com/kubeflow/pipelines
COPY sdk sdk
Expand Down
48 changes: 27 additions & 21 deletions samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.orchestration import data_types
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import evaluator_pb2
Expand All @@ -37,14 +38,18 @@

# Define pipeline params used for pipeline execution.
# Path to the module file, should be a GCS path.
_taxi_module_file_param = dsl.PipelineParam(
_taxi_module_file_param = data_types.RuntimeParameter(
name='module-file',
value='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py'
default=
'gs://ml-pipeline-playground/tfx_taxi_simple/modules/tfx_taxi_utils_1205.py',
ptype=Text,
)

# Path to the CSV data file, under which their should be a data.csv file.
_data_root_param = dsl.PipelineParam(
name='data-root', value='gs://ml-pipeline-playground/tfx_taxi_simple/data'
_data_root_param = data_types.RuntimeParameter(
name='data-root',
default='gs://ml-pipeline-playground/tfx_taxi_simple/data',
ptype=Text,
)

# Path of pipeline root, should be a GCS path.
Expand All @@ -54,8 +59,8 @@


def _create_test_pipeline(
pipeline_root: Text, csv_input_location: Text, taxi_module_file: Text,
enable_cache: bool
pipeline_root: Text, csv_input_location: data_types.RuntimeParameter,
taxi_module_file: data_types.RuntimeParameter, enable_cache: bool
):
"""Creates a simple Kubeflow-based Chicago Taxi TFX pipeline.

Expand All @@ -71,31 +76,31 @@ def _create_test_pipeline(
examples = external_input(csv_input_location)

example_gen = CsvExampleGen(input=examples)
statistics_gen = StatisticsGen(input_data=example_gen.outputs['examples'])
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
infer_schema = SchemaGen(
stats=statistics_gen.outputs['statistics'],
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False,
)
validate_stats = ExampleValidator(
stats=statistics_gen.outputs['statistics'],
statistics=statistics_gen.outputs['statistics'],
schema=infer_schema.outputs['schema'],
)
transform = Transform(
input_data=example_gen.outputs['examples'],
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
module_file=taxi_module_file,
)
trainer = Trainer(
module_file=taxi_module_file,
transformed_examples=transform.outputs['transformed_examples'],
schema=infer_schema.outputs['schema'],
transform_output=transform.outputs['transform_graph'],
transform_graph=transform.outputs['transform_graph'],
train_args=trainer_pb2.TrainArgs(num_steps=10),
eval_args=trainer_pb2.EvalArgs(num_steps=5),
)
model_analyzer = Evaluator(
examples=example_gen.outputs['examples'],
model_exports=trainer.outputs['model'],
model=trainer.outputs['model'],
feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(
specs=[
evaluator_pb2.SingleSlicingSpec(
Expand All @@ -114,7 +119,7 @@ def _create_test_pipeline(
# https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226
_pipeline_root_param = dsl.PipelineParam(name='pipeline-root')
pusher = Pusher(
model_export=trainer.outputs['model'],
model=trainer.outputs['model'],
model_blessing=model_validator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
Expand All @@ -140,8 +145,8 @@ def _create_test_pipeline(
enable_cache = True
pipeline = _create_test_pipeline(
pipeline_root,
str(_data_root_param),
str(_taxi_module_file_param),
_data_root_param,
_taxi_module_file_param,
enable_cache=enable_cache,
)
# Make sure the version of TFX image used is consistent with the version of
Expand All @@ -150,15 +155,16 @@ def _create_test_pipeline(
kubeflow_metadata_config=kubeflow_dag_runner.
get_default_kubeflow_metadata_config(),
# TODO: remove this override when KubeflowDagRunnerConfig doesn't default to use_gcp_secret op.
pipeline_operator_funcs=list(filter(
lambda operator: operator.__name__.find('gcp_secret') == -1,
kubeflow_dag_runner.get_default_pipeline_operator_funcs())),
tfx_image='tensorflow/tfx:0.15.0',
pipeline_operator_funcs=list(
filter(
lambda operator: operator.__name__.find('gcp_secret') == -1,
kubeflow_dag_runner.get_default_pipeline_operator_funcs()
)
),
tfx_image='tensorflow/tfx:0.21.0rc0',
)
kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(
output_filename=__file__ + '.yaml', config=config
)
# Make sure kfp_runner recognizes those parameters.
kfp_runner._params.extend([_data_root_param, _taxi_module_file_param])

kfp_runner.run(pipeline)
54 changes: 25 additions & 29 deletions samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"\n",
"This pipeline requires Google Cloud Storage permission to run. \n",
"If KFP was deployed through K8S marketplace, please follow instructions in [the guideline](https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials)\n",
"to make sure the service account has `storage.admin` role."
"to make sure that the GCP full scope access was enabled when creating the cluster, or at least, that the service account has `storage.admin` role."
]
},
{
Expand All @@ -29,28 +29,19 @@
"source": [
"!python3 -m pip install pip --upgrade --quiet --user\n",
"!python3 -m pip install kfp --upgrade --quiet --user\n",
"!python3 -m pip install tfx --upgrade --quiet --user"
"!python3 -m pip install tfx==0.21.0rc0 --quiet --user"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example we'll need a very recent version of TFX SDK to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n",
"In this example we'll need TFX SDK later than 0.21 to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n",
"\n",
"## RuntimeParameter in TFX DSL\n",
"Currently, TFX DSL only supports parameterizing field in the `PARAMETERS` section of `ComponentSpec`, see [here](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/types/component_spec.py#L126). This prevents runtime-parameterizing the pipeline topology. Also, if the declared type of the field is a protobuf, the user needs to pass in a dictionary with exactly the same names for each field, and specify one or more value as `RuntimeParameter` objects. In other word, the dictionary should be able to be passed in to [`ParseDict()` method](https://github.com/protocolbuffers/protobuf/blob/04a11fc91668884d1793bff2a0f72ee6ce4f5edd/python/google/protobuf/json_format.py#L433) and produce the correct pb message."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!python3 -m pip install --quiet --index-url https://test.pypi.org/simple/ tfx==0.16.0.dev20191212 --user"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -92,6 +83,7 @@
"\n",
"# Path of pipeline data root, should be a GCS path.\n",
"# Note that when running on KFP, the pipeline root is always a runtime parameter.\n",
"# The value specified here will be its default.\n",
"pipeline_root = os.path.join('gs://my-bucket', 'tfx_taxi_simple',\n",
" kfp.dsl.RUN_ID_PLACEHOLDER)\n",
"\n",
Expand Down Expand Up @@ -160,7 +152,7 @@
"metadata": {},
"outputs": [],
"source": [
"statistics_gen = StatisticsGen(input_data=example_gen.outputs['examples'])"
"statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])"
]
},
{
Expand All @@ -170,7 +162,7 @@
"outputs": [],
"source": [
"infer_schema = SchemaGen(\n",
" stats=statistics_gen.outputs['statistics'], infer_feature_shape=False)"
" statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)"
]
},
{
Expand All @@ -180,7 +172,7 @@
"outputs": [],
"source": [
"validate_stats = ExampleValidator(\n",
" stats=statistics_gen.outputs['statistics'],\n",
" statistics=statistics_gen.outputs['statistics'],\n",
" schema=infer_schema.outputs['schema'])"
]
},
Expand All @@ -193,7 +185,7 @@
"# The module file used in Transform and Trainer component is paramterized by\n",
"# _taxi_module_file_param.\n",
"transform = Transform(\n",
" input_data=example_gen.outputs['examples'],\n",
" examples=example_gen.outputs['examples'],\n",
" schema=infer_schema.outputs['schema'],\n",
" module_file=taxi_module_file_param)"
]
Expand All @@ -210,7 +202,7 @@
" module_file=taxi_module_file_param,\n",
" transformed_examples=transform.outputs['transformed_examples'],\n",
" schema=infer_schema.outputs['schema'],\n",
" transform_output=transform.outputs['transform_graph'],\n",
" transform_graph=transform.outputs['transform_graph'],\n",
" train_args={'num_steps': train_steps},\n",
" eval_args={'num_steps': eval_steps})"
]
Expand All @@ -224,7 +216,7 @@
"# The name of slicing column is specified as a RuntimeParameter.\n",
"model_analyzer = Evaluator(\n",
" examples=example_gen.outputs['examples'],\n",
" model_exports=trainer.outputs['model'],\n",
" model=trainer.outputs['model'],\n",
" feature_slicing_spec=dict(specs=[{\n",
" 'column_for_slicing': [slicing_column]\n",
" }]))"
Expand All @@ -246,19 +238,18 @@
"metadata": {},
"outputs": [],
"source": [
"# Currently we use this hack to ensure push_destination can\n",
"# be correctly parameterized and interpreted.\n",
"# Hack: ensuring push_destination can be correctly parameterized and interpreted.\n",
"# pipeline root will be specified as a dsl.PipelineParam with the name\n",
"# pipeline-root, see:\n",
"# https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226\n",
"pipeline_root_param = dsl.PipelineParam(name='pipeline-root')\n",
"_pipeline_root_param = dsl.PipelineParam(name='pipeline-root')\n",
"pusher = Pusher(\n",
" model_export=trainer.outputs['model'],\n",
" model=trainer.outputs['model'],\n",
" model_blessing=model_validator.outputs['blessing'],\n",
" push_destination=pusher_pb2.PushDestination(\n",
" filesystem=pusher_pb2.PushDestination.Filesystem(\n",
" base_directory=os.path.join(\n",
" str(pipeline_root_param), 'model_serving'))))\n"
" str(_pipeline_root_param), 'model_serving'))))\n"
]
},
{
Expand Down Expand Up @@ -290,10 +281,15 @@
"source": [
"# Specify a TFX docker image. For the full list of tags please see:\n",
"# https://hub.docker.com/r/tensorflow/tfx/tags\n",
"tfx_image = 'tensorflow/tfx:0.16.0.dev20191205'\n",
"tfx_image = 'tensorflow/tfx:0.21.0rc0'\n",
"config = kubeflow_dag_runner.KubeflowDagRunnerConfig(\n",
" kubeflow_metadata_config=kubeflow_dag_runner\n",
" .get_default_kubeflow_metadata_config(),\n",
" # Switch to use GCP service account by deleting the next line,\n",
" # if KFP is operating workload identity, or with GCP full scope permission.\n",
" pipeline_operator_funcs=list(filter(\n",
" lambda operator: operator.__name__.find('gcp_secret') == -1,\n",
" kubeflow_dag_runner.get_default_pipeline_operator_funcs())),\n",
" tfx_image=tfx_image)\n",
"kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=config)\n",
"# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.\n",
Expand All @@ -308,13 +304,13 @@
"outputs": [],
"source": [
"run_result = kfp.Client(\n",
" host='<KFP end point>'\n",
" host='1234567abcde-dot-us-central2.pipelines.googleusercontent.com' # Put your KFP endpoint here\n",
").create_run_from_pipeline_package(\n",
" pipeline_name + '.tar.gz', \n",
" arguments={\n",
" 'pipeline-root': '<gcs path to root>' + kfp.dsl.RUN_ID_PLACEHOLDER,\n",
" 'module-file': '<gcs path to the module file>', # delete this line to use default module file.\n",
" 'data-root': '<gcs path to the data>' # delete this line to use default data.\n",
" 'pipeline-root': 'gs://<your-gcs-bucket>/tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,\n",
" # 'module-file': '<gcs path to the module file>', # delete this line to use default module file.\n",
" # 'data-root': '<gcs path to the data>' # delete this line to use default data.\n",
"})"
]
}
Expand All @@ -335,7 +331,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
"version": "3.7.5rc1"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion test/sample-test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ RUN pip3 install google-api-python-client==1.7.0
RUN pip3 install google-cloud-storage==1.17.0
RUN pip3 install fire==0.2.1
RUN pip3 install yamale==2.0
RUN pip3 install apache-beam[gcp]==2.17 pyarrow==0.14.1 tfx==0.15.0
RUN pip3 install tfx==0.21.0rc0

# Install python client, including DSL compiler.
COPY ./sdk/python /sdk/python
Expand Down