Skip to content

Commit

Permalink
Bump TFX dependency to 0.21.0rc0 (kubeflow#2949)
Browse files Browse the repository at this point in the history
* bump version and modernize

* modernize

* modernize

* unpin pyarrow
  • Loading branch information
Jiaxiao Zheng authored and Jeffwan committed Dec 9, 2020
1 parent 821eb86 commit a29814b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 52 deletions.
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ FROM python:3.5 as compiler
RUN apt-get update -y && \
apt-get install --no-install-recommends -y -q default-jdk python3-setuptools python3-dev
RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py
RUN python3 -m pip install apache-beam[gcp]==2.17 pyarrow==0.14.1 tfx==0.15.0
RUN python3 -m pip install tfx==0.21.0rc0

WORKDIR /go/src/github.com/kubeflow/pipelines
COPY sdk sdk
Expand Down
48 changes: 27 additions & 21 deletions samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.orchestration import data_types
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import evaluator_pb2
Expand All @@ -37,14 +38,18 @@

# Define pipeline params used for pipeline execution.
# Path to the module file, should be a GCS path.
_taxi_module_file_param = dsl.PipelineParam(
_taxi_module_file_param = data_types.RuntimeParameter(
name='module-file',
value='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py'
default=
'gs://ml-pipeline-playground/tfx_taxi_simple/modules/tfx_taxi_utils_1205.py',
ptype=Text,
)

# Path to the CSV data file, under which their should be a data.csv file.
_data_root_param = dsl.PipelineParam(
name='data-root', value='gs://ml-pipeline-playground/tfx_taxi_simple/data'
_data_root_param = data_types.RuntimeParameter(
name='data-root',
default='gs://ml-pipeline-playground/tfx_taxi_simple/data',
ptype=Text,
)

# Path of pipeline root, should be a GCS path.
Expand All @@ -54,8 +59,8 @@


def _create_test_pipeline(
pipeline_root: Text, csv_input_location: Text, taxi_module_file: Text,
enable_cache: bool
pipeline_root: Text, csv_input_location: data_types.RuntimeParameter,
taxi_module_file: data_types.RuntimeParameter, enable_cache: bool
):
"""Creates a simple Kubeflow-based Chicago Taxi TFX pipeline.
Expand All @@ -71,31 +76,31 @@ def _create_test_pipeline(
examples = external_input(csv_input_location)

example_gen = CsvExampleGen(input=examples)
statistics_gen = StatisticsGen(input_data=example_gen.outputs['examples'])
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
infer_schema = SchemaGen(
stats=statistics_gen.outputs['statistics'],
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False,
)
validate_stats = ExampleValidator(
stats=statistics_gen.outputs['statistics'],
statistics=statistics_gen.outputs['statistics'],
schema=infer_schema.outputs['schema'],
)
transform = Transform(
input_data=example_gen.outputs['examples'],
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
module_file=taxi_module_file,
)
trainer = Trainer(
module_file=taxi_module_file,
transformed_examples=transform.outputs['transformed_examples'],
schema=infer_schema.outputs['schema'],
transform_output=transform.outputs['transform_graph'],
transform_graph=transform.outputs['transform_graph'],
train_args=trainer_pb2.TrainArgs(num_steps=10),
eval_args=trainer_pb2.EvalArgs(num_steps=5),
)
model_analyzer = Evaluator(
examples=example_gen.outputs['examples'],
model_exports=trainer.outputs['model'],
model=trainer.outputs['model'],
feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(
specs=[
evaluator_pb2.SingleSlicingSpec(
Expand All @@ -114,7 +119,7 @@ def _create_test_pipeline(
# https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226
_pipeline_root_param = dsl.PipelineParam(name='pipeline-root')
pusher = Pusher(
model_export=trainer.outputs['model'],
model=trainer.outputs['model'],
model_blessing=model_validator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
Expand All @@ -140,8 +145,8 @@ def _create_test_pipeline(
enable_cache = True
pipeline = _create_test_pipeline(
pipeline_root,
str(_data_root_param),
str(_taxi_module_file_param),
_data_root_param,
_taxi_module_file_param,
enable_cache=enable_cache,
)
# Make sure the version of TFX image used is consistent with the version of
Expand All @@ -150,15 +155,16 @@ def _create_test_pipeline(
kubeflow_metadata_config=kubeflow_dag_runner.
get_default_kubeflow_metadata_config(),
# TODO: remove this override when KubeflowDagRunnerConfig doesn't default to use_gcp_secret op.
pipeline_operator_funcs=list(filter(
lambda operator: operator.__name__.find('gcp_secret') == -1,
kubeflow_dag_runner.get_default_pipeline_operator_funcs())),
tfx_image='tensorflow/tfx:0.15.0',
pipeline_operator_funcs=list(
filter(
lambda operator: operator.__name__.find('gcp_secret') == -1,
kubeflow_dag_runner.get_default_pipeline_operator_funcs()
)
),
tfx_image='tensorflow/tfx:0.21.0rc0',
)
kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(
output_filename=__file__ + '.yaml', config=config
)
# Make sure kfp_runner recognizes those parameters.
kfp_runner._params.extend([_data_root_param, _taxi_module_file_param])

kfp_runner.run(pipeline)
54 changes: 25 additions & 29 deletions samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"\n",
"This pipeline requires Google Cloud Storage permission to run. \n",
"If KFP was deployed through K8S marketplace, please follow instructions in [the guideline](https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials)\n",
"to make sure the service account has `storage.admin` role."
"to make sure that the GCP full scope access was enabled when creating the cluster, or at least, that the service account has `storage.admin` role."
]
},
{
Expand All @@ -29,28 +29,19 @@
"source": [
"!python3 -m pip install pip --upgrade --quiet --user\n",
"!python3 -m pip install kfp --upgrade --quiet --user\n",
"!python3 -m pip install tfx --upgrade --quiet --user"
"!python3 -m pip install tfx==0.21.0rc0 --quiet --user"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example we'll need a very recent version of TFX SDK to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n",
"In this example we'll need TFX SDK later than 0.21 to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n",
"\n",
"## RuntimeParameter in TFX DSL\n",
"Currently, TFX DSL only supports parameterizing field in the `PARAMETERS` section of `ComponentSpec`, see [here](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/types/component_spec.py#L126). This prevents runtime-parameterizing the pipeline topology. Also, if the declared type of the field is a protobuf, the user needs to pass in a dictionary with exactly the same names for each field, and specify one or more value as `RuntimeParameter` objects. In other word, the dictionary should be able to be passed in to [`ParseDict()` method](https://github.com/protocolbuffers/protobuf/blob/04a11fc91668884d1793bff2a0f72ee6ce4f5edd/python/google/protobuf/json_format.py#L433) and produce the correct pb message."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!python3 -m pip install --quiet --index-url https://test.pypi.org/simple/ tfx==0.16.0.dev20191212 --user"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -92,6 +83,7 @@
"\n",
"# Path of pipeline data root, should be a GCS path.\n",
"# Note that when running on KFP, the pipeline root is always a runtime parameter.\n",
"# The value specified here will be its default.\n",
"pipeline_root = os.path.join('gs://my-bucket', 'tfx_taxi_simple',\n",
" kfp.dsl.RUN_ID_PLACEHOLDER)\n",
"\n",
Expand Down Expand Up @@ -160,7 +152,7 @@
"metadata": {},
"outputs": [],
"source": [
"statistics_gen = StatisticsGen(input_data=example_gen.outputs['examples'])"
"statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])"
]
},
{
Expand All @@ -170,7 +162,7 @@
"outputs": [],
"source": [
"infer_schema = SchemaGen(\n",
" stats=statistics_gen.outputs['statistics'], infer_feature_shape=False)"
" statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)"
]
},
{
Expand All @@ -180,7 +172,7 @@
"outputs": [],
"source": [
"validate_stats = ExampleValidator(\n",
" stats=statistics_gen.outputs['statistics'],\n",
" statistics=statistics_gen.outputs['statistics'],\n",
" schema=infer_schema.outputs['schema'])"
]
},
Expand All @@ -193,7 +185,7 @@
"# The module file used in Transform and Trainer component is paramterized by\n",
"# _taxi_module_file_param.\n",
"transform = Transform(\n",
" input_data=example_gen.outputs['examples'],\n",
" examples=example_gen.outputs['examples'],\n",
" schema=infer_schema.outputs['schema'],\n",
" module_file=taxi_module_file_param)"
]
Expand All @@ -210,7 +202,7 @@
" module_file=taxi_module_file_param,\n",
" transformed_examples=transform.outputs['transformed_examples'],\n",
" schema=infer_schema.outputs['schema'],\n",
" transform_output=transform.outputs['transform_graph'],\n",
" transform_graph=transform.outputs['transform_graph'],\n",
" train_args={'num_steps': train_steps},\n",
" eval_args={'num_steps': eval_steps})"
]
Expand All @@ -224,7 +216,7 @@
"# The name of slicing column is specified as a RuntimeParameter.\n",
"model_analyzer = Evaluator(\n",
" examples=example_gen.outputs['examples'],\n",
" model_exports=trainer.outputs['model'],\n",
" model=trainer.outputs['model'],\n",
" feature_slicing_spec=dict(specs=[{\n",
" 'column_for_slicing': [slicing_column]\n",
" }]))"
Expand All @@ -246,19 +238,18 @@
"metadata": {},
"outputs": [],
"source": [
"# Currently we use this hack to ensure push_destination can\n",
"# be correctly parameterized and interpreted.\n",
"# Hack: ensuring push_destination can be correctly parameterized and interpreted.\n",
"# pipeline root will be specified as a dsl.PipelineParam with the name\n",
"# pipeline-root, see:\n",
"# https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226\n",
"pipeline_root_param = dsl.PipelineParam(name='pipeline-root')\n",
"_pipeline_root_param = dsl.PipelineParam(name='pipeline-root')\n",
"pusher = Pusher(\n",
" model_export=trainer.outputs['model'],\n",
" model=trainer.outputs['model'],\n",
" model_blessing=model_validator.outputs['blessing'],\n",
" push_destination=pusher_pb2.PushDestination(\n",
" filesystem=pusher_pb2.PushDestination.Filesystem(\n",
" base_directory=os.path.join(\n",
" str(pipeline_root_param), 'model_serving'))))\n"
" str(_pipeline_root_param), 'model_serving'))))\n"
]
},
{
Expand Down Expand Up @@ -290,10 +281,15 @@
"source": [
"# Specify a TFX docker image. For the full list of tags please see:\n",
"# https://hub.docker.com/r/tensorflow/tfx/tags\n",
"tfx_image = 'tensorflow/tfx:0.16.0.dev20191205'\n",
"tfx_image = 'tensorflow/tfx:0.21.0rc0'\n",
"config = kubeflow_dag_runner.KubeflowDagRunnerConfig(\n",
" kubeflow_metadata_config=kubeflow_dag_runner\n",
" .get_default_kubeflow_metadata_config(),\n",
" # Switch to use GCP service account by deleting the next line,\n",
" # if KFP is operating workload identity, or with GCP full scope permission.\n",
" pipeline_operator_funcs=list(filter(\n",
" lambda operator: operator.__name__.find('gcp_secret') == -1,\n",
" kubeflow_dag_runner.get_default_pipeline_operator_funcs())),\n",
" tfx_image=tfx_image)\n",
"kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=config)\n",
"# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.\n",
Expand All @@ -308,13 +304,13 @@
"outputs": [],
"source": [
"run_result = kfp.Client(\n",
" host='<KFP end point>'\n",
" host='1234567abcde-dot-us-central2.pipelines.googleusercontent.com' # Put your KFP endpoint here\n",
").create_run_from_pipeline_package(\n",
" pipeline_name + '.tar.gz', \n",
" arguments={\n",
" 'pipeline-root': '<gcs path to root>' + kfp.dsl.RUN_ID_PLACEHOLDER,\n",
" 'module-file': '<gcs path to the module file>', # delete this line to use default module file.\n",
" 'data-root': '<gcs path to the data>' # delete this line to use default data.\n",
" 'pipeline-root': 'gs://<your-gcs-bucket>/tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,\n",
" # 'module-file': '<gcs path to the module file>', # delete this line to use default module file.\n",
" # 'data-root': '<gcs path to the data>' # delete this line to use default data.\n",
"})"
]
}
Expand All @@ -335,7 +331,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
"version": "3.7.5rc1"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion test/sample-test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ RUN pip3 install google-api-python-client==1.7.0
RUN pip3 install google-cloud-storage==1.17.0
RUN pip3 install fire==0.2.1
RUN pip3 install yamale==2.0
RUN pip3 install apache-beam[gcp]==2.17 pyarrow==0.14.1 tfx==0.15.0
RUN pip3 install tfx==0.21.0rc0

# Install python client, including DSL compiler.
COPY ./sdk/python /sdk/python
Expand Down

0 comments on commit a29814b

Please sign in to comment.