From b63472062bd80737c7f39e0eda901db0fe23a5e0 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 4 Dec 2019 17:52:31 -0800 Subject: [PATCH] Components - TFX (#2671) * Added CsvExampleGen component * Switched to using some processing code from the component class Needs testing * Renamed output_examples to example_artifacts for consistency with the original component * Fixed the docstring a bit * Added StatisticsGen First draft * Added SchemaGen First draft * Fixed the input_dict construction * Use None defaults * Switched to TFX container image * Updated component definitions * Fixed StatisticsGen and SchemaGen Input artifacts must have splits. Split URIs should end with "/'. The ciomponents now work. Also printing component_class_instance for debugging. * Printing component instance in CsvExampleGen * Moved components to directories * Updated the sample TFX pipeline * Renamed ExamplesPath to Examples for data passing components * Corrected output_component_file paths * Added the Transform component The component uses almost completely generic code. * Added the Trainer component * Added the Evaluator component * Added the ExampleValidator component * Added the BigQueryExampleGen component * Added the ImportExampleGen component * Updated the sample Added ExampleValidator, Transform, Trainer, Evaluator * Upgraded to TFX 0.15.0 * Upgraded the sample to 0.15.0 * Silence Flake8 for annotations --- components/tfx/Evaluator/component.py | 130 +++++++ components/tfx/Evaluator/component.yaml | 232 ++++++++++++ .../BigQueryExampleGen/component.py | 107 ++++++ .../BigQueryExampleGen/component.yaml | 193 ++++++++++ .../tfx/ExampleGen/CsvExampleGen/component.py | 96 +++++ .../ExampleGen/CsvExampleGen/component.yaml | 172 +++++++++ .../CsvExampleGen/with_URI_IO/component.py | 98 +++++ .../CsvExampleGen/with_URI_IO/component.yaml | 166 +++++++++ .../ExampleGen/ImportExampleGen/component.py | 112 ++++++ .../ImportExampleGen/component.yaml | 202 ++++++++++ components/tfx/ExampleValidator/component.py | 117 ++++++ .../tfx/ExampleValidator/component.yaml | 203 ++++++++++ components/tfx/SchemaGen/component.py | 80 ++++ components/tfx/SchemaGen/component.yaml | 156 ++++++++ components/tfx/StatisticsGen/component.py | 77 ++++ components/tfx/StatisticsGen/component.yaml | 136 +++++++ components/tfx/Trainer/component.py | 173 +++++++++ components/tfx/Trainer/component.yaml | 347 ++++++++++++++++++ components/tfx/Transform/component.py | 141 +++++++ components/tfx/Transform/component.yaml | 264 +++++++++++++ components/tfx/_samples/TFX_pipeline.ipynb | 162 ++++++++ 21 files changed, 3364 insertions(+) create mode 100644 components/tfx/Evaluator/component.py create mode 100644 components/tfx/Evaluator/component.yaml create mode 100644 components/tfx/ExampleGen/BigQueryExampleGen/component.py create mode 100644 components/tfx/ExampleGen/BigQueryExampleGen/component.yaml create mode 100644 components/tfx/ExampleGen/CsvExampleGen/component.py create mode 100644 components/tfx/ExampleGen/CsvExampleGen/component.yaml create mode 100644 components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.py create mode 100644 components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.yaml create mode 100644 components/tfx/ExampleGen/ImportExampleGen/component.py create mode 100644 components/tfx/ExampleGen/ImportExampleGen/component.yaml create mode 100644 components/tfx/ExampleValidator/component.py create mode 100644 components/tfx/ExampleValidator/component.yaml create mode 100644 components/tfx/SchemaGen/component.py create mode 100644 components/tfx/SchemaGen/component.yaml create mode 100644 components/tfx/StatisticsGen/component.py create mode 100644 components/tfx/StatisticsGen/component.yaml create mode 100644 components/tfx/Trainer/component.py create mode 100644 components/tfx/Trainer/component.yaml create mode 100644 components/tfx/Transform/component.py create mode 100644 components/tfx/Transform/component.yaml create mode 100644 components/tfx/_samples/TFX_pipeline.ipynb diff --git a/components/tfx/Evaluator/component.py b/components/tfx/Evaluator/component.py new file mode 100644 index 00000000000..a21deb8f5d6 --- /dev/null +++ b/components/tfx/Evaluator/component.py @@ -0,0 +1,130 @@ +# flake8: noqa TODO + +from kfp.components import InputPath, OutputPath + + +def Evaluator( + examples_path: InputPath('Examples'), + model_exports_path: InputPath('Model'), + #model_path: InputPath('Model'), + + output_path: OutputPath('ModelEval'), + + feature_slicing_spec: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' = None, +): + """ + A TFX component to evaluate models trained by a TFX Trainer component. + + The Evaluator component performs model evaluations in the TFX pipeline and + the resultant metrics can be viewed in a Jupyter notebook. It uses the + input examples generated from the + [ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) + component to evaluate the models. + + Specifically, it can provide: + - metrics computed on entire training and eval dataset + - tracking metrics over time + - model quality performance on different feature slices + + ## Exporting the EvalSavedModel in Trainer + + In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be + exported during training, which is a special SavedModel containing + annotations for the metrics, features, labels, and so on in your model. + Evaluator uses this EvalSavedModel to compute metrics. + + As part of this, the Trainer component creates eval_input_receiver_fn, + analogous to the serving_input_receiver_fn, which will extract the features + and labels from the input data. As with serving_input_receiver_fn, there are + utility functions to help with this. + + Please see https://www.tensorflow.org/tfx/model_analysis for more details. + + Args: + examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen + component. @Ark-kun: Must have the eval split. _required_ + model_exports: A Channel of 'ModelExportPath' type, usually produced by + Trainer component. Will be deprecated in the future for the `model` + parameter. + #model: Future replacement of the `model_exports` argument. + feature_slicing_spec: + [evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) + instance that describes how Evaluator should slice the data. + Returns: + output: Channel of `ModelEvalPath` to store the evaluation results. + + Either `model_exports` or `model` must be present in the input arguments. + + """ + from tfx.components.evaluator.component import Evaluator + component_class = Evaluator + input_channels_with_splits = {'examples'} + output_channels_with_splits = {} + + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + Evaluator, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/Evaluator/component.yaml b/components/tfx/Evaluator/component.yaml new file mode 100644 index 00000000000..7fa0ad842fe --- /dev/null +++ b/components/tfx/Evaluator/component.yaml @@ -0,0 +1,232 @@ +name: Evaluator +description: | + A TFX component to evaluate models trained by a TFX Trainer component. + + The Evaluator component performs model evaluations in the TFX pipeline and + the resultant metrics can be viewed in a Jupyter notebook. It uses the + input examples generated from the + [ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) + component to evaluate the models. + + Specifically, it can provide: + - metrics computed on entire training and eval dataset + - tracking metrics over time + - model quality performance on different feature slices + + ## Exporting the EvalSavedModel in Trainer + + In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be + exported during training, which is a special SavedModel containing + annotations for the metrics, features, labels, and so on in your model. + Evaluator uses this EvalSavedModel to compute metrics. + + As part of this, the Trainer component creates eval_input_receiver_fn, + analogous to the serving_input_receiver_fn, which will extract the features + and labels from the input data. As with serving_input_receiver_fn, there are + utility functions to help with this. + + Please see https://www.tensorflow.org/tfx/model_analysis for more details. + + Args: + examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen + component. @Ark-kun: Must have the eval split. _required_ + model_exports: A Channel of 'ModelExportPath' type, usually produced by + Trainer component. Will be deprecated in the future for the `model` + parameter. + #model: Future replacement of the `model_exports` argument. + feature_slicing_spec: + [evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) + instance that describes how Evaluator should slice the data. + Returns: + output: Channel of `ModelEvalPath` to store the evaluation results. + + Either `model_exports` or `model` must be present in the input arguments. +inputs: +- name: examples + type: Examples +- name: model_exports + type: Model +- name: feature_slicing_spec + type: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' + optional: true +outputs: +- name: output + type: ModelEval +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + class InputPath: + '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + def __init__(self, type=None): + self.type = type + + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def Evaluator( + examples_path: InputPath('Examples'), + model_exports_path: InputPath('Model'), + #model_path: InputPath('Model'), + + output_path: OutputPath('ModelEval'), + + feature_slicing_spec: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' = None, + ): + """ + A TFX component to evaluate models trained by a TFX Trainer component. + + The Evaluator component performs model evaluations in the TFX pipeline and + the resultant metrics can be viewed in a Jupyter notebook. It uses the + input examples generated from the + [ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) + component to evaluate the models. + + Specifically, it can provide: + - metrics computed on entire training and eval dataset + - tracking metrics over time + - model quality performance on different feature slices + + ## Exporting the EvalSavedModel in Trainer + + In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be + exported during training, which is a special SavedModel containing + annotations for the metrics, features, labels, and so on in your model. + Evaluator uses this EvalSavedModel to compute metrics. + + As part of this, the Trainer component creates eval_input_receiver_fn, + analogous to the serving_input_receiver_fn, which will extract the features + and labels from the input data. As with serving_input_receiver_fn, there are + utility functions to help with this. + + Please see https://www.tensorflow.org/tfx/model_analysis for more details. + + Args: + examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen + component. @Ark-kun: Must have the eval split. _required_ + model_exports: A Channel of 'ModelExportPath' type, usually produced by + Trainer component. Will be deprecated in the future for the `model` + parameter. + #model: Future replacement of the `model_exports` argument. + feature_slicing_spec: + [evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) + instance that describes how Evaluator should slice the data. + Returns: + output: Channel of `ModelEvalPath` to store the evaluation results. + + Either `model_exports` or `model` must be present in the input arguments. + + """ + from tfx.components.evaluator.component import Evaluator + component_class = Evaluator + input_channels_with_splits = {'examples'} + output_channels_with_splits = {} + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + import argparse + _parser = argparse.ArgumentParser(prog='Evaluator', description="A TFX component to evaluate models trained by a TFX Trainer component.\n\n The Evaluator component performs model evaluations in the TFX pipeline and\n the resultant metrics can be viewed in a Jupyter notebook. It uses the\n input examples generated from the\n [ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen)\n component to evaluate the models.\n\n Specifically, it can provide:\n - metrics computed on entire training and eval dataset\n - tracking metrics over time\n - model quality performance on different feature slices\n\n ## Exporting the EvalSavedModel in Trainer\n\n In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be\n exported during training, which is a special SavedModel containing\n annotations for the metrics, features, labels, and so on in your model.\n Evaluator uses this EvalSavedModel to compute metrics.\n\n As part of this, the Trainer component creates eval_input_receiver_fn,\n analogous to the serving_input_receiver_fn, which will extract the features\n and labels from the input data. As with serving_input_receiver_fn, there are\n utility functions to help with this.\n\n Please see https://www.tensorflow.org/tfx/model_analysis for more details.\n\n Args:\n examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen\n component. @Ark-kun: Must have the eval split. _required_\n model_exports: A Channel of 'ModelExportPath' type, usually produced by\n Trainer component. Will be deprecated in the future for the `model`\n parameter.\n #model: Future replacement of the `model_exports` argument.\n feature_slicing_spec:\n [evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto)\n instance that describes how Evaluator should slice the data.\n Returns:\n output: Channel of `ModelEvalPath` to store the evaluation results.\n\n Either `model_exports` or `model` must be present in the input arguments.\n") + _parser.add_argument("--examples", dest="examples_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--model-exports", dest="model_exports_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--feature-slicing-spec", dest="feature_slicing_spec", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = Evaluator(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --examples + - inputPath: examples + - --model-exports + - inputPath: model_exports + - if: + cond: + isPresent: feature_slicing_spec + then: + - --feature-slicing-spec + - inputValue: feature_slicing_spec + - --output + - outputPath: output diff --git a/components/tfx/ExampleGen/BigQueryExampleGen/component.py b/components/tfx/ExampleGen/BigQueryExampleGen/component.py new file mode 100644 index 00000000000..a7473e0bf81 --- /dev/null +++ b/components/tfx/ExampleGen/BigQueryExampleGen/component.py @@ -0,0 +1,107 @@ +# flake8: noqa TODO + +from kfp.components import InputPath, OutputPath + + +def BigQueryExampleGen( + example_artifacts_path: OutputPath('Examples'), + + query: str = None, + input_config: 'JsonObject: example_gen_pb2.Input' = None, + output_config: 'JsonObject: example_gen_pb2.Output' = None, +): + """ + Official TFX BigQueryExampleGen component. + + The BigQuery examplegen component takes a query, and generates train + and eval examples for downsteam components. + + + Args: + query: BigQuery sql string, query result will be treated as a single + split, can be overwritten by input_config. + input_config: An example_gen_pb2.Input instance with Split.pattern as + BigQuery sql string. If set, it overwrites the 'query' arg, and allows + different queries per split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + Returns: + example_artifacts: Optional channel of 'ExamplesPath' for output train and + eval examples. + + Raises: + RuntimeError: Only one of query and input_config should be set. + """ + from tfx.components.example_gen.csv_example_gen.component import BigQueryExampleGen + component_class = BigQueryExampleGen + input_channels_with_splits = {} + output_channels_with_splits = {'example_artifacts'} + + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + BigQueryExampleGen, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/ExampleGen/BigQueryExampleGen/component.yaml b/components/tfx/ExampleGen/BigQueryExampleGen/component.yaml new file mode 100644 index 00000000000..f0c8e76b3a6 --- /dev/null +++ b/components/tfx/ExampleGen/BigQueryExampleGen/component.yaml @@ -0,0 +1,193 @@ +name: Bigqueryexamplegen +description: | + Official TFX BigQueryExampleGen component. + + The BigQuery examplegen component takes a query, and generates train + and eval examples for downsteam components. + + + Args: + query: BigQuery sql string, query result will be treated as a single + split, can be overwritten by input_config. + input_config: An example_gen_pb2.Input instance with Split.pattern as + BigQuery sql string. If set, it overwrites the 'query' arg, and allows + different queries per split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + Returns: + example_artifacts: Optional channel of 'ExamplesPath' for output train and + eval examples. + + Raises: + RuntimeError: Only one of query and input_config should be set. +inputs: +- name: query + type: String + optional: true +- name: input_config + type: 'JsonObject: example_gen_pb2.Input' + optional: true +- name: output_config + type: 'JsonObject: example_gen_pb2.Output' + optional: true +outputs: +- name: example_artifacts + type: Examples +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + def BigQueryExampleGen( + example_artifacts_path: OutputPath('Examples'), + + query: str = None, + input_config: 'JsonObject: example_gen_pb2.Input' = None, + output_config: 'JsonObject: example_gen_pb2.Output' = None, + ): + """ + Official TFX BigQueryExampleGen component. + + The BigQuery examplegen component takes a query, and generates train + and eval examples for downsteam components. + + Args: + query: BigQuery sql string, query result will be treated as a single + split, can be overwritten by input_config. + input_config: An example_gen_pb2.Input instance with Split.pattern as + BigQuery sql string. If set, it overwrites the 'query' arg, and allows + different queries per split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + Returns: + example_artifacts: Optional channel of 'ExamplesPath' for output train and + eval examples. + + Raises: + RuntimeError: Only one of query and input_config should be set. + """ + from tfx.components.example_gen.csv_example_gen.component import BigQueryExampleGen + component_class = BigQueryExampleGen + input_channels_with_splits = {} + output_channels_with_splits = {'example_artifacts'} + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + import argparse + _parser = argparse.ArgumentParser(prog='Bigqueryexamplegen', description="Official TFX BigQueryExampleGen component.\n\n The BigQuery examplegen component takes a query, and generates train\n and eval examples for downsteam components.\n\n\n Args:\n query: BigQuery sql string, query result will be treated as a single\n split, can be overwritten by input_config.\n input_config: An example_gen_pb2.Input instance with Split.pattern as\n BigQuery sql string. If set, it overwrites the 'query' arg, and allows\n different queries per split.\n output_config: An example_gen_pb2.Output instance, providing output\n configuration. If unset, default splits will be 'train' and 'eval' with\n size 2:1.\n Returns:\n example_artifacts: Optional channel of 'ExamplesPath' for output train and\n eval examples.\n\n Raises:\n RuntimeError: Only one of query and input_config should be set.\n") + _parser.add_argument("--query", dest="query", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--input-config", dest="input_config", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-config", dest="output_config", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--example-artifacts", dest="example_artifacts_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = BigQueryExampleGen(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - if: + cond: + isPresent: query + then: + - --query + - inputValue: query + - if: + cond: + isPresent: input_config + then: + - --input-config + - inputValue: input_config + - if: + cond: + isPresent: output_config + then: + - --output-config + - inputValue: output_config + - --example-artifacts + - outputPath: example_artifacts diff --git a/components/tfx/ExampleGen/CsvExampleGen/component.py b/components/tfx/ExampleGen/CsvExampleGen/component.py new file mode 100644 index 00000000000..e956b29a883 --- /dev/null +++ b/components/tfx/ExampleGen/CsvExampleGen/component.py @@ -0,0 +1,96 @@ +# flake8: noqa TODO + +from kfp.components import InputPath, OutputPath + +def CsvExampleGen( + # Inputs + input_base_path: InputPath('ExternalPath'), + #input_base_path: 'ExternalPath', # A Channel of 'ExternalPath' type, which includes one artifact whose uri is an external directory with csv files inside (required). + + # Outputs + example_artifacts_path: OutputPath('Examples'), + #example_artifacts_path: 'ExamplesPath', + + # Execution properties + #input_config_splits: {'List' : {'item_type': 'ExampleGen.Input.Split'}}, + input_config: 'ExampleGen.Input' = None, # = '{"splits": []}', # JSON-serialized example_gen_pb2.Input instance, providing input configuration. If unset, the files under input_base will be treated as a single split. + #output_config_splits: {'List' : {'item_type': 'ExampleGen.SplitConfig'}}, + output_config: 'ExampleGen.Output' = None, # = '{"splitConfig": {"splits": []}}', # JSON-serialized example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. + #custom_config: 'ExampleGen.CustomConfig' = None, +): + """Executes the CsvExampleGen component. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with csv files inside (required). + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + ??? input: Forwards compatibility alias for the 'input_base' argument. + Returns: + example_artifacts: Artifact of type 'Examples' for output train and + eval examples. + """ + + import json + import os + from google.protobuf import json_format + from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen + from tfx.proto import example_gen_pb2 + from tfx.types import standard_artifacts + from tfx.types import channel_utils + + # Create input dict. + input_base = standard_artifacts.ExternalArtifact() + input_base.uri = input_base_path + input_base_channel = channel_utils.as_channel([input_base]) + + input_config_obj = None + if input_config: + input_config_obj = example_gen_pb2.Input() + json_format.Parse(input_config, input_config_obj) + + output_config_obj = None + if output_config: + output_config_obj = example_gen_pb2.Output() + json_format.Parse(output_config, output_config_obj) + + component_class_instance = CsvExampleGen( + input=input_base_channel, + input_config=input_config_obj, + output_config=output_config_obj, + ) + + # component_class_instance.inputs/outputs are wrappers that do not behave like real dictionaries. The underlying dict can be accessed using .get_all() + # Channel artifacts can be accessed by calling .get() + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for output_artifact in output_dict['examples']: + output_artifact.uri = example_artifacts_path + if output_artifact.split: + output_artifact.uri = os.path.join(output_artifact.uri, output_artifact.split) + + print('component instance: ' + str(component_class_instance)) + + executor = CsvExampleGen.EXECUTOR_SPEC.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + CsvExampleGen, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/ExampleGen/CsvExampleGen/component.yaml b/components/tfx/ExampleGen/CsvExampleGen/component.yaml new file mode 100644 index 00000000000..b5a3c52a8d2 --- /dev/null +++ b/components/tfx/ExampleGen/CsvExampleGen/component.yaml @@ -0,0 +1,172 @@ +name: CsvExampleGen +inputs: +- {name: input_base, type: ExternalPath} +- {name: input_config, optional: true, type: ExampleGen.Input} +- {name: output_config, optional: true, type: ExampleGen.Output} +outputs: +- {name: example_artifacts, type: Examples} +description: | + Executes the CsvExampleGen component. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with csv files inside (required). + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + ??? input: Forwards compatibility alias for the 'input_base' argument. + Returns: + example_artifacts: Artifact of type 'Examples' for output train and + eval examples. +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + class InputPath: + '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + def __init__(self, type=None): + self.type = type + + def CsvExampleGen( + # Inputs + input_base_path: InputPath('ExternalPath'), + #input_base_path: 'ExternalPath', # A Channel of 'ExternalPath' type, which includes one artifact whose uri is an external directory with csv files inside (required). + + # Outputs + example_artifacts_path: OutputPath('ExamplesPath'), + #example_artifacts_path: 'ExamplesPath', + + # Execution properties + #input_config_splits: {'List' : {'item_type': 'ExampleGen.Input.Split'}}, + input_config: 'ExampleGen.Input' = None, # = '{"splits": []}', # JSON-serialized example_gen_pb2.Input instance, providing input configuration. If unset, the files under input_base will be treated as a single split. + #output_config_splits: {'List' : {'item_type': 'ExampleGen.SplitConfig'}}, + output_config: 'ExampleGen.Output' = None, # = '{"splitConfig": {"splits": []}}', # JSON-serialized example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. + #custom_config: 'ExampleGen.CustomConfig' = None, + ): + """\ + Executes the CsvExampleGen component. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with csv files inside (required). + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + ??? input: Forwards compatibility alias for the 'input_base' argument. + Returns: + example_artifacts: Artifact of type 'ExamplesPath' for output train and + eval examples. + """ + + import json + import os + from google.protobuf import json_format + from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen + from tfx.proto import example_gen_pb2 + from tfx.types import standard_artifacts + from tfx.types import channel_utils + + # Create input dict. + input_base = standard_artifacts.ExternalArtifact() + input_base.uri = input_base_path + input_base_channel = channel_utils.as_channel([input_base]) + + input_config_obj = None + if input_config: + input_config_obj = example_gen_pb2.Input() + json_format.Parse(input_config, input_config_obj) + + output_config_obj = None + if output_config: + output_config_obj = example_gen_pb2.Output() + json_format.Parse(output_config, output_config_obj) + + component_class_instance = CsvExampleGen( + input=input_base_channel, + input_config=input_config_obj, + output_config=output_config_obj, + ) + + # component_class_instance.inputs/outputs are wrappers that do not behave like real dictionaries. The underlying dict can be accessed using .get_all() + # Channel artifacts can be accessed by calling .get() + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for output_artifact in output_dict['examples']: + output_artifact.uri = example_artifacts_path + if output_artifact.split: + output_artifact.uri = os.path.join(output_artifact.uri, output_artifact.split) + + print('component instance: ' + str(component_class_instance)) + + executor = CsvExampleGen.EXECUTOR_SPEC.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + import argparse + _parser = argparse.ArgumentParser(prog='Csvexamplegen', description="Executes the CsvExampleGen component.\n\n Args:\n input_base: A Channel of 'ExternalPath' type, which includes one artifact\n whose uri is an external directory with csv files inside (required).\n input_config: An example_gen_pb2.Input instance, providing input\n configuration. If unset, the files under input_base will be treated as a\n single split.\n output_config: An example_gen_pb2.Output instance, providing output\n configuration. If unset, default splits will be 'train' and 'eval' with\n size 2:1.\n ??? input: Forwards compatibility alias for the 'input_base' argument.\n Returns:\n example_artifacts: Artifact of type 'ExamplesPath' for output train and\n eval examples.\n") + _parser.add_argument("--input-base", dest="input_base_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--input-config", dest="input_config", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-config", dest="output_config", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--example-artifacts", dest="example_artifacts_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = CsvExampleGen(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --input-base + - {inputPath: input_base} + - if: + cond: {isPresent: input_config} + then: + - --input-config + - {inputValue: input_config} + - if: + cond: {isPresent: output_config} + then: + - --output-config + - {inputValue: output_config} + - --example-artifacts + - {outputPath: example_artifacts} diff --git a/components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.py b/components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.py new file mode 100644 index 00000000000..8e78dd05bfb --- /dev/null +++ b/components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.py @@ -0,0 +1,98 @@ +# flake8: noqa TODO + +from typing import NamedTuple + +def CsvExampleGen_GCS( # + # Inputs + #input_base_path: InputPath('ExternalPath'), + input_base_path: 'ExternalPath', # A Channel of 'ExternalPath' type, which includes one artifact whose uri is an external directory with csv files inside (required). + + # Outputs + #example_artifacts_path: OutputPath('ExamplesPath'), + example_artifacts_path: 'ExamplesPath', + + # Execution properties + #input_config_splits: {'List' : {'item_type': 'ExampleGen.Input.Split'}}, + input_config: 'ExampleGen.Input' = None, # = '{"splits": []}', # JSON-serialized example_gen_pb2.Input instance, providing input configuration. If unset, the files under input_base will be treated as a single split. + #output_config_splits: {'List' : {'item_type': 'ExampleGen.SplitConfig'}}, + output_config: 'ExampleGen.Output' = None, # = '{"splitConfig": {"splits": []}}', # JSON-serialized example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. + #custom_config: 'ExampleGen.CustomConfig' = None, +) -> NamedTuple('Outputs', [ + ('example_artifacts', 'ExamplesPath'), +]): + """Executes the CsvExampleGen component. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with csv files inside (required). + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + ??? input: Forwards compatibility alias for the 'input_base' argument. + Returns: + example_artifacts: Artifact of type 'ExamplesPath' for output train and + eval examples. + """ + + import json + import os + from google.protobuf import json_format + from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen + from tfx.proto import example_gen_pb2 + from tfx.types import standard_artifacts + from tfx.types import channel_utils + + # Create input dict. + input_base = standard_artifacts.ExternalArtifact() + input_base.uri = input_base_path + input_base_channel = channel_utils.as_channel([input_base]) + + input_config_obj = None + if input_config: + input_config_obj = example_gen_pb2.Input() + json_format.Parse(input_config, input_config_obj) + + output_config_obj = None + if output_config: + output_config_obj = example_gen_pb2.Output() + json_format.Parse(output_config, output_config_obj) + + component_class_instance = CsvExampleGen( + input=input_base_channel, + input_config=input_config_obj, + output_config=output_config_obj, + ) + + # component_class_instance.inputs/outputs are wrappers that do not behave like real dictionaries. The underlying dict can be accessed using .get_all() + # Channel artifacts can be accessed by calling .get() + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for output_artifact in output_dict['examples']: + output_artifact.uri = example_artifacts_path + if output_artifact.split: + output_artifact.uri = os.path.join(output_artifact.uri, output_artifact.split) + + print('component instance: ' + str(component_class_instance)) + + executor = CsvExampleGen.EXECUTOR_SPEC.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + return (example_artifacts_path,) + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + CsvExampleGen_GCS, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.yaml b/components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.yaml new file mode 100644 index 00000000000..54a6868ad67 --- /dev/null +++ b/components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.yaml @@ -0,0 +1,166 @@ +name: CsvExampleGen GCS +inputs: +- {name: input_base_path, type: ExternalPath} +- {name: example_artifacts_path, type: ExamplesPath} +- {name: input_config, optional: true, type: ExampleGen.Input} +- {name: output_config, optional: true, type: ExampleGen.Output} +outputs: +- {name: example_artifacts, type: ExamplesPath} +description: | + Executes the CsvExampleGen component. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with csv files inside (required). + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + ??? input: Forwards compatibility alias for the 'input_base' argument. + Returns: + example_artifacts: Artifact of type 'ExamplesPath' for output train and + eval examples. +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + from typing import NamedTuple + + def CsvExampleGen_GCS( # + # Inputs + #input_base_path: InputPath('ExternalPath'), + input_base_path: 'ExternalPath', # A Channel of 'ExternalPath' type, which includes one artifact whose uri is an external directory with csv files inside (required). + + # Outputs + #example_artifacts_path: OutputPath('ExamplesPath'), + example_artifacts_path: 'ExamplesPath', + + # Execution properties + #input_config_splits: {'List' : {'item_type': 'ExampleGen.Input.Split'}}, + input_config: 'ExampleGen.Input' = None, # = '{"splits": []}', # JSON-serialized example_gen_pb2.Input instance, providing input configuration. If unset, the files under input_base will be treated as a single split. + #output_config_splits: {'List' : {'item_type': 'ExampleGen.SplitConfig'}}, + output_config: 'ExampleGen.Output' = None, # = '{"splitConfig": {"splits": []}}', # JSON-serialized example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. + #custom_config: 'ExampleGen.CustomConfig' = None, + ) -> NamedTuple('Outputs', [ + ('example_artifacts', 'ExamplesPath'), + ]): + """Executes the CsvExampleGen component. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with csv files inside (required). + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + ??? input: Forwards compatibility alias for the 'input_base' argument. + Returns: + example_artifacts: Artifact of type 'ExamplesPath' for output train and + eval examples. + """ + + import json + import os + from google.protobuf import json_format + from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen + from tfx.proto import example_gen_pb2 + from tfx.types import standard_artifacts + from tfx.types import channel_utils + + # Create input dict. + input_base = standard_artifacts.ExternalArtifact() + input_base.uri = input_base_path + input_base_channel = channel_utils.as_channel([input_base]) + + input_config_obj = None + if input_config: + input_config_obj = example_gen_pb2.Input() + json_format.Parse(input_config, input_config_obj) + + output_config_obj = None + if output_config: + output_config_obj = example_gen_pb2.Output() + json_format.Parse(output_config, output_config_obj) + + component_class_instance = CsvExampleGen( + input=input_base_channel, + input_config=input_config_obj, + output_config=output_config_obj, + ) + + # component_class_instance.inputs/outputs are wrappers that do not behave like real dictionaries. The underlying dict can be accessed using .get_all() + # Channel artifacts can be accessed by calling .get() + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for output_artifact in output_dict['examples']: + output_artifact.uri = example_artifacts_path + if output_artifact.split: + output_artifact.uri = os.path.join(output_artifact.uri, output_artifact.split) + + print('component instance: ' + str(component_class_instance)) + + executor = CsvExampleGen.EXECUTOR_SPEC.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + return (example_artifacts_path,) + + import argparse + _parser = argparse.ArgumentParser(prog='Csvexamplegen gcs', description="Executes the CsvExampleGen component.\n\n Args:\n input_base: A Channel of 'ExternalPath' type, which includes one artifact\n whose uri is an external directory with csv files inside (required).\n input_config: An example_gen_pb2.Input instance, providing input\n configuration. If unset, the files under input_base will be treated as a\n single split.\n output_config: An example_gen_pb2.Output instance, providing output\n configuration. If unset, default splits will be 'train' and 'eval' with\n size 2:1.\n ??? input: Forwards compatibility alias for the 'input_base' argument.\n Returns:\n example_artifacts: Artifact of type 'ExamplesPath' for output train and\n eval examples.\n") + _parser.add_argument("--input-base-path", dest="input_base_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--example-artifacts-path", dest="example_artifacts_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--input-config", dest="input_config", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-config", dest="output_config", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = CsvExampleGen_GCS(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + str, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --input-base-path + - {inputValue: input_base_path} + - --example-artifacts-path + - {inputValue: example_artifacts_path} + - if: + cond: {isPresent: input_config} + then: + - --input-config + - {inputValue: input_config} + - if: + cond: {isPresent: output_config} + then: + - --output-config + - {inputValue: output_config} + - '----output-paths' + - {outputPath: example_artifacts} diff --git a/components/tfx/ExampleGen/ImportExampleGen/component.py b/components/tfx/ExampleGen/ImportExampleGen/component.py new file mode 100644 index 00000000000..02c4dac8a8f --- /dev/null +++ b/components/tfx/ExampleGen/ImportExampleGen/component.py @@ -0,0 +1,112 @@ +# flake8: noqa TODO + +from kfp.components import InputPath, OutputPath + + +def ImportExampleGen( + input_base_path: InputPath('ExternalPath'), + #input_path: InputPath('ExternalPath'), + + example_artifacts_path: OutputPath('Examples'), + + input_config: 'JsonObject: example_gen_pb2.Input' = None, + output_config: 'JsonObject: example_gen_pb2.Output' = None, +): + """ + Official TFX ImportExampleGen component. + + The ImportExampleGen component takes TFRecord files with TF Example data + format, and generates train and eval examples for downsteam components. + This component provides consistent and configurable partition, and it also + shuffle the dataset for ML best practice. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with TFRecord files inside + (required). + #input: Forwards compatibility alias for the 'input_base' argument. + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + Returns: + example_artifacts: Optional channel of 'ExamplesPath' for output train and + eval examples. + + Raises: + RuntimeError: Only one of query and input_config should be set. + """ + from tfx.components.example_gen.import_example_gen.component import ImportExampleGen + component_class = ImportExampleGen + input_channels_with_splits = {} + output_channels_with_splits = {'example_artifacts'} + + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + ImportExampleGen, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/ExampleGen/ImportExampleGen/component.yaml b/components/tfx/ExampleGen/ImportExampleGen/component.yaml new file mode 100644 index 00000000000..8328e455b50 --- /dev/null +++ b/components/tfx/ExampleGen/ImportExampleGen/component.yaml @@ -0,0 +1,202 @@ +name: Importexamplegen +description: | + Official TFX ImportExampleGen component. + + The ImportExampleGen component takes TFRecord files with TF Example data + format, and generates train and eval examples for downsteam components. + This component provides consistent and configurable partition, and it also + shuffle the dataset for ML best practice. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with TFRecord files inside + (required). + #input: Forwards compatibility alias for the 'input_base' argument. + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + Returns: + example_artifacts: Optional channel of 'ExamplesPath' for output train and + eval examples. + + Raises: + RuntimeError: Only one of query and input_config should be set. +inputs: +- name: input_base + type: ExternalPath +- name: input_config + type: 'JsonObject: example_gen_pb2.Input' + optional: true +- name: output_config + type: 'JsonObject: example_gen_pb2.Output' + optional: true +outputs: +- name: example_artifacts + type: Examples +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + class InputPath: + '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + def __init__(self, type=None): + self.type = type + + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + def ImportExampleGen( + input_base_path: InputPath('ExternalPath'), + #input_path: InputPath('ExternalPath'), + + example_artifacts_path: OutputPath('Examples'), + + input_config: 'JsonObject: example_gen_pb2.Input' = None, + output_config: 'JsonObject: example_gen_pb2.Output' = None, + ): + """ + Official TFX ImportExampleGen component. + + The ImportExampleGen component takes TFRecord files with TF Example data + format, and generates train and eval examples for downsteam components. + This component provides consistent and configurable partition, and it also + shuffle the dataset for ML best practice. + + Args: + input_base: A Channel of 'ExternalPath' type, which includes one artifact + whose uri is an external directory with TFRecord files inside + (required). + #input: Forwards compatibility alias for the 'input_base' argument. + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + Returns: + example_artifacts: Optional channel of 'ExamplesPath' for output train and + eval examples. + + Raises: + RuntimeError: Only one of query and input_config should be set. + """ + from tfx.components.example_gen.import_example_gen.component import ImportExampleGen + component_class = ImportExampleGen + input_channels_with_splits = {} + output_channels_with_splits = {'example_artifacts'} + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + import argparse + _parser = argparse.ArgumentParser(prog='Importexamplegen', description="Official TFX ImportExampleGen component.\n\n The ImportExampleGen component takes TFRecord files with TF Example data\n format, and generates train and eval examples for downsteam components.\n This component provides consistent and configurable partition, and it also\n shuffle the dataset for ML best practice.\n\n Args:\n input_base: A Channel of 'ExternalPath' type, which includes one artifact\n whose uri is an external directory with TFRecord files inside\n (required).\n #input: Forwards compatibility alias for the 'input_base' argument.\n input_config: An example_gen_pb2.Input instance, providing input\n configuration. If unset, the files under input_base will be treated as a\n single split.\n output_config: An example_gen_pb2.Output instance, providing output\n configuration. If unset, default splits will be 'train' and 'eval' with\n size 2:1.\n Returns:\n example_artifacts: Optional channel of 'ExamplesPath' for output train and\n eval examples.\n\n Raises:\n RuntimeError: Only one of query and input_config should be set.\n") + _parser.add_argument("--input-base", dest="input_base_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--input-config", dest="input_config", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output-config", dest="output_config", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--example-artifacts", dest="example_artifacts_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = ImportExampleGen(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --input-base + - inputPath: input_base + - if: + cond: + isPresent: input_config + then: + - --input-config + - inputValue: input_config + - if: + cond: + isPresent: output_config + then: + - --output-config + - inputValue: output_config + - --example-artifacts + - outputPath: example_artifacts diff --git a/components/tfx/ExampleValidator/component.py b/components/tfx/ExampleValidator/component.py new file mode 100644 index 00000000000..3d1f350d55b --- /dev/null +++ b/components/tfx/ExampleValidator/component.py @@ -0,0 +1,117 @@ +from kfp.components import InputPath, OutputPath + + +def ExampleValidator( + stats_path: InputPath('ExampleStatistics'), + #statistics_path: InputPath('ExampleStatistics'), + schema_path: InputPath('Schema'), + + output_path: OutputPath('ExampleValidation'), +): + """ + A TFX component to validate input examples. + + The ExampleValidator component uses [Tensorflow Data + Validation](https://www.tensorflow.org/tfx/data_validation) to + validate the statistics of some splits on input examples against a schema. + + The ExampleValidator component identifies anomalies in training and serving + data. The component can be configured to detect different classes of anomalies + in the data. It can: + - perform validity checks by comparing data statistics against a schema that + codifies expectations of the user. + - detect data drift by looking at a series of data. + - detect changes in dataset-wide data (i.e., num_examples) across spans or + versions. + + Schema Based Example Validation + The ExampleValidator component identifies any anomalies in the example data by + comparing data statistics computed by the StatisticsGen component against a + schema. The schema codifies properties which the input data is expected to + satisfy, and is provided and maintained by the user. + + Please see https://www.tensorflow.org/tfx/data_validation for more details. + + Args: + stats: A Channel of 'ExampleStatisticsPath` type. This should contain at + least 'eval' split. Other splits are ignored currently. Will be + deprecated in the future for the `statistics` parameter. + #statistics: Future replacement of the 'stats' argument. + schema: A Channel of "SchemaPath' type. _required_ + Returns: + output: Output channel of 'ExampleValidationPath' type. + + Either `stats` or `statistics` must be present in the arguments. + """ + from tfx.components.example_validator.component import ExampleValidator + component_class = ExampleValidator + input_channels_with_splits = {'stats', 'statistics'} + output_channels_with_splits = {} + + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + ExampleValidator, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/ExampleValidator/component.yaml b/components/tfx/ExampleValidator/component.yaml new file mode 100644 index 00000000000..0c7036a2ef3 --- /dev/null +++ b/components/tfx/ExampleValidator/component.yaml @@ -0,0 +1,203 @@ +name: Examplevalidator +description: | + A TFX component to validate input examples. + + The ExampleValidator component uses [Tensorflow Data + Validation](https://www.tensorflow.org/tfx/data_validation) to + validate the statistics of some splits on input examples against a schema. + + The ExampleValidator component identifies anomalies in training and serving + data. The component can be configured to detect different classes of anomalies + in the data. It can: + - perform validity checks by comparing data statistics against a schema that + codifies expectations of the user. + - detect data drift by looking at a series of data. + - detect changes in dataset-wide data (i.e., num_examples) across spans or + versions. + + Schema Based Example Validation + The ExampleValidator component identifies any anomalies in the example data by + comparing data statistics computed by the StatisticsGen component against a + schema. The schema codifies properties which the input data is expected to + satisfy, and is provided and maintained by the user. + + Please see https://www.tensorflow.org/tfx/data_validation for more details. + + Args: + stats: A Channel of 'ExampleStatisticsPath` type. This should contain at + least 'eval' split. Other splits are ignored currently. Will be + deprecated in the future for the `statistics` parameter. + #statistics: Future replacement of the 'stats' argument. + schema: A Channel of "SchemaPath' type. _required_ + Returns: + output: Output channel of 'ExampleValidationPath' type. + + Either `stats` or `statistics` must be present in the arguments. +inputs: +- name: stats + type: ExampleStatistics +- name: schema + type: Schema +outputs: +- name: output + type: ExampleValidation +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + class InputPath: + '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + def __init__(self, type=None): + self.type = type + + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + def ExampleValidator( + stats_path: InputPath('ExampleStatistics'), + #statistics_path: InputPath('ExampleStatistics'), + schema_path: InputPath('Schema'), + + output_path: OutputPath('ExampleValidation'), + ): + """ + A TFX component to validate input examples. + + The ExampleValidator component uses [Tensorflow Data + Validation](https://www.tensorflow.org/tfx/data_validation) to + validate the statistics of some splits on input examples against a schema. + + The ExampleValidator component identifies anomalies in training and serving + data. The component can be configured to detect different classes of anomalies + in the data. It can: + - perform validity checks by comparing data statistics against a schema that + codifies expectations of the user. + - detect data drift by looking at a series of data. + - detect changes in dataset-wide data (i.e., num_examples) across spans or + versions. + + Schema Based Example Validation + The ExampleValidator component identifies any anomalies in the example data by + comparing data statistics computed by the StatisticsGen component against a + schema. The schema codifies properties which the input data is expected to + satisfy, and is provided and maintained by the user. + + Please see https://www.tensorflow.org/tfx/data_validation for more details. + + Args: + stats: A Channel of 'ExampleStatisticsPath` type. This should contain at + least 'eval' split. Other splits are ignored currently. Will be + deprecated in the future for the `statistics` parameter. + #statistics: Future replacement of the 'stats' argument. + schema: A Channel of "SchemaPath' type. _required_ + Returns: + output: Output channel of 'ExampleValidationPath' type. + + Either `stats` or `statistics` must be present in the arguments. + """ + from tfx.components.example_validator.component import ExampleValidator + component_class = ExampleValidator + input_channels_with_splits = {'stats', 'statistics'} + output_channels_with_splits = {} + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + import argparse + _parser = argparse.ArgumentParser(prog='Examplevalidator', description='A TFX component to validate input examples.\n\n The ExampleValidator component uses [Tensorflow Data\n Validation](https://www.tensorflow.org/tfx/data_validation) to\n validate the statistics of some splits on input examples against a schema.\n\n The ExampleValidator component identifies anomalies in training and serving\n data. The component can be configured to detect different classes of anomalies\n in the data. It can:\n - perform validity checks by comparing data statistics against a schema that\n codifies expectations of the user.\n - detect data drift by looking at a series of data.\n - detect changes in dataset-wide data (i.e., num_examples) across spans or\n versions.\n\n Schema Based Example Validation\n The ExampleValidator component identifies any anomalies in the example data by\n comparing data statistics computed by the StatisticsGen component against a\n schema. The schema codifies properties which the input data is expected to\n satisfy, and is provided and maintained by the user.\n\n Please see https://www.tensorflow.org/tfx/data_validation for more details.\n\n Args:\n stats: A Channel of \'ExampleStatisticsPath` type. This should contain at\n least \'eval\' split. Other splits are ignored currently. Will be\n deprecated in the future for the `statistics` parameter.\n #statistics: Future replacement of the \'stats\' argument.\n schema: A Channel of "SchemaPath\' type. _required_\n Returns:\n output: Output channel of \'ExampleValidationPath\' type.\n\n Either `stats` or `statistics` must be present in the arguments.\n') + _parser.add_argument("--stats", dest="stats_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--schema", dest="schema_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = ExampleValidator(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --stats + - inputPath: stats + - --schema + - inputPath: schema + - --output + - outputPath: output diff --git a/components/tfx/SchemaGen/component.py b/components/tfx/SchemaGen/component.py new file mode 100644 index 00000000000..ba630436e5c --- /dev/null +++ b/components/tfx/SchemaGen/component.py @@ -0,0 +1,80 @@ +from kfp.components import InputPath, OutputPath + + +def SchemaGen( + stats_path: InputPath('ExampleStatistics'), + #statistics_path: InputPath('ExampleStatistics'), + output_path: OutputPath('Schema'), + #schema_path: InputPath('Schema') = None, + infer_feature_shape: bool = False, +): + """Constructs a SchemaGen component. + + Args: + stats: A Channel of `ExampleStatistics` type (required if spec is not + passed). This should contain at least a `train` split. Other splits are + currently ignored. + # Exactly one of 'stats'/'statistics' or 'schema' is required. + #schema: A Channel of `Schema` type that provides an instance of Schema. + # If provided, pass through this schema artifact as the output. Exactly + # one of 'stats'/'statistics' or 'schema' is required. + infer_feature_shape: Boolean value indicating whether or not to infer the + shape of features. If the feature shape is not inferred, downstream + Tensorflow Transform component using the schema will parse input + as tf.SparseTensor. + #statistics: Future replacement of the 'stats' argument. + #Either `statistics` or `stats` must be present in the input arguments. + Returns: + output: Output `Schema` channel for schema result. + """ + + import json + import os + from google.protobuf import json_format + from tfx.types import standard_artifacts + from tfx.types import channel_utils + + # Create input dict. + input_base_path = stats_path + input_artifact_class = standard_artifacts.ExampleStatistics + # Recovering splits + splits = sorted(os.listdir(input_base_path)) + input_data_artifacts = [] + for split in splits: + artifact = input_artifact_class() + artifact.split = split + artifact.uri = os.path.join(input_base_path, split) + '/' + input_data_artifacts.append(artifact) + input_data_channel = channel_utils.as_channel(input_data_artifacts) + + from tfx.components.schema_gen.component import SchemaGen + component_class_instance = SchemaGen( + stats=input_data_channel, + ) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for output_artifact in output_dict['output']: + output_artifact.uri = os.path.join(output_path, output_artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + #return (output_path,) + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + SchemaGen, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/SchemaGen/component.yaml b/components/tfx/SchemaGen/component.yaml new file mode 100644 index 00000000000..dc0a5a32a5f --- /dev/null +++ b/components/tfx/SchemaGen/component.yaml @@ -0,0 +1,156 @@ +name: Schemagen +inputs: +- {name: stats, type: ExampleStatistics} +- {default: 'False', name: infer_feature_shape, optional: true, type: Boolean} +outputs: +- {name: output, type: Schema} +description: | + Constructs a SchemaGen component. + + Args: + stats: A Channel of `ExampleStatistics` type (required if spec is not + passed). This should contain at least a `train` split. Other splits are + currently ignored. + # Exactly one of 'stats'/'statistics' or 'schema' is required. + #schema: A Channel of `Schema` type that provides an instance of Schema. + # If provided, pass through this schema artifact as the output. Exactly + # one of 'stats'/'statistics' or 'schema' is required. + infer_feature_shape: Boolean value indicating whether or not to infer the + shape of features. If the feature shape is not inferred, downstream + Tensorflow Transform component using the schema will parse input + as tf.SparseTensor. + #statistics: Future replacement of the 'stats' argument. + #Either `statistics` or `stats` must be present in the input arguments. + Returns: + output: Output `Schema` channel for schema result. +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + class InputPath: + '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + def __init__(self, type=None): + self.type = type + + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def SchemaGen( + stats_path: InputPath('ExampleStatistics'), + #statistics_path: InputPath('ExampleStatistics'), + output_path: OutputPath('Schema'), + #schema_path: InputPath('Schema') = None, + infer_feature_shape: bool = False, + ): + """Constructs a SchemaGen component. + + Args: + stats: A Channel of `ExampleStatistics` type (required if spec is not + passed). This should contain at least a `train` split. Other splits are + currently ignored. + # Exactly one of 'stats'/'statistics' or 'schema' is required. + #schema: A Channel of `Schema` type that provides an instance of Schema. + # If provided, pass through this schema artifact as the output. Exactly + # one of 'stats'/'statistics' or 'schema' is required. + infer_feature_shape: Boolean value indicating whether or not to infer the + shape of features. If the feature shape is not inferred, downstream + Tensorflow Transform component using the schema will parse input + as tf.SparseTensor. + #statistics: Future replacement of the 'stats' argument. + #Either `statistics` or `stats` must be present in the input arguments. + Returns: + output: Output `Schema` channel for schema result. + """ + + import json + import os + from google.protobuf import json_format + from tfx.types import standard_artifacts + from tfx.types import channel_utils + + # Create input dict. + input_base_path = stats_path + input_artifact_class = standard_artifacts.ExampleStatistics + # Recovering splits + splits = sorted(os.listdir(input_base_path)) + input_data_artifacts = [] + for split in splits: + artifact = input_artifact_class() + artifact.split = split + artifact.uri = os.path.join(input_base_path, split) + '/' + input_data_artifacts.append(artifact) + input_data_channel = channel_utils.as_channel(input_data_artifacts) + + from tfx.components.schema_gen.component import SchemaGen + component_class_instance = SchemaGen( + stats=input_data_channel, + ) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for output_artifact in output_dict['output']: + output_artifact.uri = os.path.join(output_path, output_artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + #return (output_path,) + + def _deserialize_bool(s) -> bool: + from distutils.util import strtobool + return strtobool(s) == 1 + + import argparse + _parser = argparse.ArgumentParser(prog='Schemagen', description="Constructs a SchemaGen component.\n\n Args:\n stats: A Channel of `ExampleStatistics` type (required if spec is not\n passed). This should contain at least a `train` split. Other splits are\n currently ignored.\n # Exactly one of 'stats'/'statistics' or 'schema' is required.\n #schema: A Channel of `Schema` type that provides an instance of Schema.\n # If provided, pass through this schema artifact as the output. Exactly\n # one of 'stats'/'statistics' or 'schema' is required.\n infer_feature_shape: Boolean value indicating whether or not to infer the\n shape of features. If the feature shape is not inferred, downstream\n Tensorflow Transform component using the schema will parse input\n as tf.SparseTensor.\n #statistics: Future replacement of the 'stats' argument.\n #Either `statistics` or `stats` must be present in the input arguments.\n Returns:\n output: Output `Schema` channel for schema result.\n") + _parser.add_argument("--stats", dest="stats_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--infer-feature-shape", dest="infer_feature_shape", type=_deserialize_bool, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = SchemaGen(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --stats + - {inputPath: stats} + - if: + cond: {isPresent: infer_feature_shape} + then: + - --infer-feature-shape + - {inputValue: infer_feature_shape} + - --output + - {outputPath: output} diff --git a/components/tfx/StatisticsGen/component.py b/components/tfx/StatisticsGen/component.py new file mode 100644 index 00000000000..acdffed8eab --- /dev/null +++ b/components/tfx/StatisticsGen/component.py @@ -0,0 +1,77 @@ +from kfp.components import InputPath, OutputPath + + +def StatisticsGen( + # Inputs + input_data_path: InputPath('Examples'), + #input_data_path: 'ExamplesPath', + + # Outputs + output_path: OutputPath('ExampleStatistics'), + #output_path: 'ExampleStatistics', +): +#) -> NamedTuple('Outputs', [ +# ('output', 'ExampleStatistics'), +#]): + """Construct a StatisticsGen component. + + Args: + input_data: A Channel of `Examples` type, likely generated by the + [ExampleGen component](https://www.tensorflow.org/tfx/guide/examplegen). + This needs to contain two splits labeled `train` and `eval`. _required_ + # examples: Forwards compatibility alias for the `input_data` argument. + Returns: + output: `ExampleStatistics` channel for statistics of each split + provided in the input examples. + """ + + import json + import os + from google.protobuf import json_format + from tfx.types import standard_artifacts + from tfx.types import channel_utils + + # Create input dict. + input_base_path = input_data_path + input_artifact_class = standard_artifacts.Examples + # Recovering splits + splits = sorted(os.listdir(input_data_path)) + input_data_artifacts = [] + for split in splits: + artifact = input_artifact_class() + artifact.split = split + artifact.uri = os.path.join(input_base_path, split) + '/' + input_data_artifacts.append(artifact) + input_data_channel = channel_utils.as_channel(input_data_artifacts) + + from tfx.components.statistics_gen.component import StatisticsGen + component_class_instance = StatisticsGen( + input_data=input_data_channel, + ) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for output_artifact in output_dict['output']: + output_artifact.uri = os.path.join(output_path, output_artifact.split) # Default split is '' + + print('Component instance: ' + str(component_class_instance)) + + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + #return (output_path,) + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + StatisticsGen, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/StatisticsGen/component.yaml b/components/tfx/StatisticsGen/component.yaml new file mode 100644 index 00000000000..5d21d7d3d0f --- /dev/null +++ b/components/tfx/StatisticsGen/component.yaml @@ -0,0 +1,136 @@ +name: Statisticsgen +inputs: +- name: input_data + type: Examples +outputs: +- name: output + type: ExampleStatistics +description: | + Construct a StatisticsGen component. + + Args: + input_data: A Channel of `ExamplesPath` type, likely generated by the + [ExampleGen component](https://www.tensorflow.org/tfx/guide/examplegen). + This needs to contain two splits labeled `train` and `eval`. _required_ + # examples: Forwards compatibility alias for the `input_data` argument. + Returns: + output: `ExampleStatistics` channel for statistics of each split + provided in the input examples. +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + class InputPath: + '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + def __init__(self, type=None): + self.type = type + + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def StatisticsGen( + # Inputs + input_data_path: InputPath('ExamplesPath'), + #input_data_path: 'ExamplesPath', + + # Outputs + output_path: OutputPath('ExampleStatistics'), + #output_path: 'ExampleStatistics', + ): + #) -> NamedTuple('Outputs', [ + # ('output', 'ExampleStatistics'), + #]): + """Construct a StatisticsGen component. + + Args: + input_data: A Channel of `ExamplesPath` type, likely generated by the + [ExampleGen component](https://www.tensorflow.org/tfx/guide/examplegen). + This needs to contain two splits labeled `train` and `eval`. _required_ + # examples: Forwards compatibility alias for the `input_data` argument. + Returns: + output: `ExampleStatistics` channel for statistics of each split + provided in the input examples. + """ + + import json + import os + from google.protobuf import json_format + from tfx.types import standard_artifacts + from tfx.types import channel_utils + + # Create input dict. + input_base_path = input_data_path + input_artifact_class = standard_artifacts.Examples + # Recovering splits + splits = sorted(os.listdir(input_data_path)) + input_data_artifacts = [] + for split in splits: + artifact = input_artifact_class() + artifact.split = split + artifact.uri = os.path.join(input_base_path, split) + '/' + input_data_artifacts.append(artifact) + input_data_channel = channel_utils.as_channel(input_data_artifacts) + + from tfx.components.statistics_gen.component import StatisticsGen + component_class_instance = StatisticsGen( + input_data=input_data_channel, + ) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for output_artifact in output_dict['output']: + output_artifact.uri = os.path.join(output_path, output_artifact.split) # Default split is '' + + print('Component instance: ' + str(component_class_instance)) + + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + #return (output_path,) + + import argparse + _parser = argparse.ArgumentParser(prog='Statisticsgen', description='Construct a StatisticsGen component.\n\n Args:\n input_data: A Channel of `ExamplesPath` type, likely generated by the\n [ExampleGen component](https://www.tensorflow.org/tfx/guide/examplegen).\n This needs to contain two splits labeled `train` and `eval`. _required_\n # examples: Forwards compatibility alias for the `input_data` argument.\n Returns:\n output: `ExampleStatistics` channel for statistics of each split\n provided in the input examples.\n') + _parser.add_argument("--input-data", dest="input_data_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = StatisticsGen(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --input-data + - inputPath: input_data + - --output + - outputPath: output diff --git a/components/tfx/Trainer/component.py b/components/tfx/Trainer/component.py new file mode 100644 index 00000000000..d5cc637aa5f --- /dev/null +++ b/components/tfx/Trainer/component.py @@ -0,0 +1,173 @@ +# flake8: noqa TODO + +from kfp.components import InputPath, OutputPath + + +def Trainer( + examples_path: InputPath('Examples'), + transform_output_path: InputPath('TransformGraph'), # ? = None + #transform_graph_path: InputPath('TransformGraph'), + schema_path: InputPath('Schema'), + + output_path: OutputPath('Model'), + + module_file: str = None, + trainer_fn: str = None, + train_args: 'JsonObject: tfx.proto.trainer_pb2.TrainArgs' = None, + eval_args: 'JsonObject: tfx.proto.trainer_pb2.EvalArgs' = None, + #custom_config: dict = None, + #custom_executor_spec: Optional[executor_spec.ExecutorSpec] = None, +): + """ + A TFX component to train a TensorFlow model. + + The Trainer component is used to train and eval a model using given inputs and + a user-supplied estimator. This component includes a custom driver to + optionally grab previous model to warm start from. + + ## Providing an estimator + The TFX executor will use the estimator provided in the `module_file` file + to train the model. The Trainer executor will look specifically for the + `trainer_fn()` function within that file. Before training, the executor will + call that function expecting the following returned as a dictionary: + + - estimator: The + [estimator](https://www.tensorflow.org/api_docs/python/tf/estimator/Estimator) + to be used by TensorFlow to train the model. + - train_spec: The + [configuration](https://www.tensorflow.org/api_docs/python/tf/estimator/TrainSpec) + to be used by the "train" part of the TensorFlow `train_and_evaluate()` + call. + - eval_spec: The + [configuration](https://www.tensorflow.org/api_docs/python/tf/estimator/EvalSpec) + to be used by the "eval" part of the TensorFlow `train_and_evaluate()` call. + - eval_input_receiver_fn: The + [configuration](https://www.tensorflow.org/tfx/model_analysis/get_started#modify_an_existing_model) + to be used + by the [ModelValidator](https://www.tensorflow.org/tfx/guide/modelval) + component when validating the model. + + An example of `trainer_fn()` can be found in the [user-supplied + code]((https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py)) + of the TFX Chicago Taxi pipeline example. + + + Args: + examples: A Channel of 'ExamplesPath' type, serving as the source of + examples that are used in training (required). May be raw or + transformed. + transform_output: An optional Channel of 'TransformPath' type, serving as + the input transform graph if present. + #transform_graph: Forwards compatibility alias for the 'transform_output' + # argument. + schema: A Channel of 'SchemaPath' type, serving as the schema of training + and eval data. + module_file: A path to python module file containing UDF model definition. + The module_file must implement a function named `trainer_fn` at its + top level. The function must have the following signature. + + def trainer_fn(tf.contrib.training.HParams, + tensorflow_metadata.proto.v0.schema_pb2) -> Dict: + ... + + where the returned Dict has the following key-values. + 'estimator': an instance of tf.estimator.Estimator + 'train_spec': an instance of tf.estimator.TrainSpec + 'eval_spec': an instance of tf.estimator.EvalSpec + 'eval_input_receiver_fn': an instance of tfma.export.EvalInputReceiver + + Exactly one of 'module_file' or 'trainer_fn' must be supplied. + trainer_fn: A python path to UDF model definition function. See + 'module_file' for the required signature of the UDF. + Exactly one of 'module_file' or 'trainer_fn' must be supplied. + train_args: A trainer_pb2.TrainArgs instance, containing args used for + training. Current only num_steps is available. + eval_args: A trainer_pb2.EvalArgs instance, containing args used for eval. + Current only num_steps is available. + #custom_config: A dict which contains the training job parameters to be + # passed to Google Cloud ML Engine. For the full set of parameters + # supported by Google Cloud ML Engine, refer to + # https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job + #custom_executor_spec: Optional custom executor spec. + Returns: + output: Optional 'ModelExportPath' channel for result of exported models. + Raises: + ValueError: + - When both or neither of 'module_file' and 'trainer_fn' is supplied. + - When both or neither of 'examples' and 'transformed_examples' + is supplied. + - When 'transformed_examples' is supplied but 'transform_output' + is not supplied. + """ + from tfx.components.trainer.component import Trainer + component_class = Trainer + input_channels_with_splits = {'examples'} + output_channels_with_splits = {} + + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + Trainer, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/Trainer/component.yaml b/components/tfx/Trainer/component.yaml new file mode 100644 index 00000000000..0e45ee12ceb --- /dev/null +++ b/components/tfx/Trainer/component.yaml @@ -0,0 +1,347 @@ +name: Trainer +description: | + A TFX component to train a TensorFlow model. + + The Trainer component is used to train and eval a model using given inputs and + a user-supplied estimator. This component includes a custom driver to + optionally grab previous model to warm start from. + + ## Providing an estimator + The TFX executor will use the estimator provided in the `module_file` file + to train the model. The Trainer executor will look specifically for the + `trainer_fn()` function within that file. Before training, the executor will + call that function expecting the following returned as a dictionary: + + - estimator: The + [estimator](https://www.tensorflow.org/api_docs/python/tf/estimator/Estimator) + to be used by TensorFlow to train the model. + - train_spec: The + [configuration](https://www.tensorflow.org/api_docs/python/tf/estimator/TrainSpec) + to be used by the "train" part of the TensorFlow `train_and_evaluate()` + call. + - eval_spec: The + [configuration](https://www.tensorflow.org/api_docs/python/tf/estimator/EvalSpec) + to be used by the "eval" part of the TensorFlow `train_and_evaluate()` call. + - eval_input_receiver_fn: The + [configuration](https://www.tensorflow.org/tfx/model_analysis/get_started#modify_an_existing_model) + to be used + by the [ModelValidator](https://www.tensorflow.org/tfx/guide/modelval) + component when validating the model. + + An example of `trainer_fn()` can be found in the [user-supplied + code]((https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py)) + of the TFX Chicago Taxi pipeline example. + + + Args: + examples: A Channel of 'ExamplesPath' type, serving as the source of + examples that are used in training (required). May be raw or + transformed. + transform_output: An optional Channel of 'TransformPath' type, serving as + the input transform graph if present. + #transform_graph: Forwards compatibility alias for the 'transform_output' + # argument. + schema: A Channel of 'SchemaPath' type, serving as the schema of training + and eval data. + module_file: A path to python module file containing UDF model definition. + The module_file must implement a function named `trainer_fn` at its + top level. The function must have the following signature. + + def trainer_fn(tf.contrib.training.HParams, + tensorflow_metadata.proto.v0.schema_pb2) -> Dict: + ... + + where the returned Dict has the following key-values. + 'estimator': an instance of tf.estimator.Estimator + 'train_spec': an instance of tf.estimator.TrainSpec + 'eval_spec': an instance of tf.estimator.EvalSpec + 'eval_input_receiver_fn': an instance of tfma.export.EvalInputReceiver + + Exactly one of 'module_file' or 'trainer_fn' must be supplied. + trainer_fn: A python path to UDF model definition function. See + 'module_file' for the required signature of the UDF. + Exactly one of 'module_file' or 'trainer_fn' must be supplied. + train_args: A trainer_pb2.TrainArgs instance, containing args used for + training. Current only num_steps is available. + eval_args: A trainer_pb2.EvalArgs instance, containing args used for eval. + Current only num_steps is available. + #custom_config: A dict which contains the training job parameters to be + # passed to Google Cloud ML Engine. For the full set of parameters + # supported by Google Cloud ML Engine, refer to + # https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job + #custom_executor_spec: Optional custom executor spec. + Returns: + output: Optional 'ModelExportPath' channel for result of exported models. + Raises: + ValueError: + - When both or neither of 'module_file' and 'trainer_fn' is supplied. + - When both or neither of 'examples' and 'transformed_examples' + is supplied. + - When 'transformed_examples' is supplied but 'transform_output' + is not supplied. +inputs: +- name: examples + type: Examples +- name: transform_output + type: TransformGraph +- name: schema + type: Schema +- name: module_file + type: String + optional: true +- name: trainer_fn + type: String + optional: true +- name: train_args + type: 'JsonObject: tfx.proto.trainer_pb2.TrainArgs' + optional: true +- name: eval_args + type: 'JsonObject: tfx.proto.trainer_pb2.EvalArgs' + optional: true +outputs: +- name: output + type: Model +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + class InputPath: + '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + def __init__(self, type=None): + self.type = type + + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def Trainer( + examples_path: InputPath('Examples'), + transform_output_path: InputPath('TransformGraph'), # ? = None + #transform_graph_path: InputPath('TransformGraph'), + schema_path: InputPath('Schema'), + + output_path: OutputPath('Model'), + + module_file: str = None, + trainer_fn: str = None, + train_args: 'JsonObject: tfx.proto.trainer_pb2.TrainArgs' = None, + eval_args: 'JsonObject: tfx.proto.trainer_pb2.EvalArgs' = None, + #custom_config: dict = None, + #custom_executor_spec: Optional[executor_spec.ExecutorSpec] = None, + ): + """ + A TFX component to train a TensorFlow model. + + The Trainer component is used to train and eval a model using given inputs and + a user-supplied estimator. This component includes a custom driver to + optionally grab previous model to warm start from. + + ## Providing an estimator + The TFX executor will use the estimator provided in the `module_file` file + to train the model. The Trainer executor will look specifically for the + `trainer_fn()` function within that file. Before training, the executor will + call that function expecting the following returned as a dictionary: + + - estimator: The + [estimator](https://www.tensorflow.org/api_docs/python/tf/estimator/Estimator) + to be used by TensorFlow to train the model. + - train_spec: The + [configuration](https://www.tensorflow.org/api_docs/python/tf/estimator/TrainSpec) + to be used by the "train" part of the TensorFlow `train_and_evaluate()` + call. + - eval_spec: The + [configuration](https://www.tensorflow.org/api_docs/python/tf/estimator/EvalSpec) + to be used by the "eval" part of the TensorFlow `train_and_evaluate()` call. + - eval_input_receiver_fn: The + [configuration](https://www.tensorflow.org/tfx/model_analysis/get_started#modify_an_existing_model) + to be used + by the [ModelValidator](https://www.tensorflow.org/tfx/guide/modelval) + component when validating the model. + + An example of `trainer_fn()` can be found in the [user-supplied + code]((https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py)) + of the TFX Chicago Taxi pipeline example. + + Args: + examples: A Channel of 'ExamplesPath' type, serving as the source of + examples that are used in training (required). May be raw or + transformed. + transform_output: An optional Channel of 'TransformPath' type, serving as + the input transform graph if present. + #transform_graph: Forwards compatibility alias for the 'transform_output' + # argument. + schema: A Channel of 'SchemaPath' type, serving as the schema of training + and eval data. + module_file: A path to python module file containing UDF model definition. + The module_file must implement a function named `trainer_fn` at its + top level. The function must have the following signature. + + def trainer_fn(tf.contrib.training.HParams, + tensorflow_metadata.proto.v0.schema_pb2) -> Dict: + ... + + where the returned Dict has the following key-values. + 'estimator': an instance of tf.estimator.Estimator + 'train_spec': an instance of tf.estimator.TrainSpec + 'eval_spec': an instance of tf.estimator.EvalSpec + 'eval_input_receiver_fn': an instance of tfma.export.EvalInputReceiver + + Exactly one of 'module_file' or 'trainer_fn' must be supplied. + trainer_fn: A python path to UDF model definition function. See + 'module_file' for the required signature of the UDF. + Exactly one of 'module_file' or 'trainer_fn' must be supplied. + train_args: A trainer_pb2.TrainArgs instance, containing args used for + training. Current only num_steps is available. + eval_args: A trainer_pb2.EvalArgs instance, containing args used for eval. + Current only num_steps is available. + #custom_config: A dict which contains the training job parameters to be + # passed to Google Cloud ML Engine. For the full set of parameters + # supported by Google Cloud ML Engine, refer to + # https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job + #custom_executor_spec: Optional custom executor spec. + Returns: + output: Optional 'ModelExportPath' channel for result of exported models. + Raises: + ValueError: + - When both or neither of 'module_file' and 'trainer_fn' is supplied. + - When both or neither of 'examples' and 'transformed_examples' + is supplied. + - When 'transformed_examples' is supplied but 'transform_output' + is not supplied. + """ + from tfx.components.trainer.component import Trainer + component_class = Trainer + input_channels_with_splits = {'examples'} + output_channels_with_splits = {} + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + import argparse + _parser = argparse.ArgumentParser(prog='Trainer', description='A TFX component to train a TensorFlow model.\n\n The Trainer component is used to train and eval a model using given inputs and\n a user-supplied estimator. This component includes a custom driver to\n optionally grab previous model to warm start from.\n\n ## Providing an estimator\n The TFX executor will use the estimator provided in the `module_file` file\n to train the model. The Trainer executor will look specifically for the\n `trainer_fn()` function within that file. Before training, the executor will\n call that function expecting the following returned as a dictionary:\n\n - estimator: The\n [estimator](https://www.tensorflow.org/api_docs/python/tf/estimator/Estimator)\n to be used by TensorFlow to train the model.\n - train_spec: The\n [configuration](https://www.tensorflow.org/api_docs/python/tf/estimator/TrainSpec)\n to be used by the "train" part of the TensorFlow `train_and_evaluate()`\n call.\n - eval_spec: The\n [configuration](https://www.tensorflow.org/api_docs/python/tf/estimator/EvalSpec)\n to be used by the "eval" part of the TensorFlow `train_and_evaluate()` call.\n - eval_input_receiver_fn: The\n [configuration](https://www.tensorflow.org/tfx/model_analysis/get_started#modify_an_existing_model)\n to be used\n by the [ModelValidator](https://www.tensorflow.org/tfx/guide/modelval)\n component when validating the model.\n\n An example of `trainer_fn()` can be found in the [user-supplied\n code]((https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py))\n of the TFX Chicago Taxi pipeline example.\n\n\n Args:\n examples: A Channel of \'ExamplesPath\' type, serving as the source of\n examples that are used in training (required). May be raw or\n transformed.\n transform_output: An optional Channel of \'TransformPath\' type, serving as\n the input transform graph if present.\n #transform_graph: Forwards compatibility alias for the \'transform_output\'\n # argument.\n schema: A Channel of \'SchemaPath\' type, serving as the schema of training\n and eval data.\n module_file: A path to python module file containing UDF model definition.\n The module_file must implement a function named `trainer_fn` at its\n top level. The function must have the following signature.\n\n def trainer_fn(tf.contrib.training.HParams,\n tensorflow_metadata.proto.v0.schema_pb2) -> Dict:\n ...\n\n where the returned Dict has the following key-values.\n \'estimator\': an instance of tf.estimator.Estimator\n \'train_spec\': an instance of tf.estimator.TrainSpec\n \'eval_spec\': an instance of tf.estimator.EvalSpec\n \'eval_input_receiver_fn\': an instance of tfma.export.EvalInputReceiver\n\n Exactly one of \'module_file\' or \'trainer_fn\' must be supplied.\n trainer_fn: A python path to UDF model definition function. See\n \'module_file\' for the required signature of the UDF.\n Exactly one of \'module_file\' or \'trainer_fn\' must be supplied.\n train_args: A trainer_pb2.TrainArgs instance, containing args used for\n training. Current only num_steps is available.\n eval_args: A trainer_pb2.EvalArgs instance, containing args used for eval.\n Current only num_steps is available.\n #custom_config: A dict which contains the training job parameters to be\n # passed to Google Cloud ML Engine. For the full set of parameters\n # supported by Google Cloud ML Engine, refer to\n # https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job\n #custom_executor_spec: Optional custom executor spec.\n Returns:\n output: Optional \'ModelExportPath\' channel for result of exported models.\n Raises:\n ValueError:\n - When both or neither of \'module_file\' and \'trainer_fn\' is supplied.\n - When both or neither of \'examples\' and \'transformed_examples\'\n is supplied.\n - When \'transformed_examples\' is supplied but \'transform_output\'\n is not supplied.\n') + _parser.add_argument("--examples", dest="examples_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--transform-output", dest="transform_output_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--schema", dest="schema_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--module-file", dest="module_file", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--trainer-fn", dest="trainer_fn", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--train-args", dest="train_args", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--eval-args", dest="eval_args", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = Trainer(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --examples + - inputPath: examples + - --transform-output + - inputPath: transform_output + - --schema + - inputPath: schema + - if: + cond: + isPresent: module_file + then: + - --module-file + - inputValue: module_file + - if: + cond: + isPresent: trainer_fn + then: + - --trainer-fn + - inputValue: trainer_fn + - if: + cond: + isPresent: train_args + then: + - --train-args + - inputValue: train_args + - if: + cond: + isPresent: eval_args + then: + - --eval-args + - inputValue: eval_args + - --output + - outputPath: output diff --git a/components/tfx/Transform/component.py b/components/tfx/Transform/component.py new file mode 100644 index 00000000000..ed4d6300c4d --- /dev/null +++ b/components/tfx/Transform/component.py @@ -0,0 +1,141 @@ +# flake8: noqa TODO + +from kfp.components import InputPath, OutputPath + + +def Transform( + input_data_path: InputPath('Examples'), + #examples: InputPath('Examples'), + schema_path: InputPath('Schema'), + + transform_output_path: OutputPath('TransformGraph'), + #transform_graph_path: OutputPath('TransformGraph'), + transformed_examples_path: OutputPath('Examples'), + + module_file: 'Uri' = None, + preprocessing_fn: str = None, +): + """A TFX component to transform the input examples. + + The Transform component wraps TensorFlow Transform (tf.Transform) to + preprocess data in a TFX pipeline. This component will load the + preprocessing_fn from input module file, preprocess both 'train' and 'eval' + splits of input examples, generate the `tf.Transform` output, and save both + transform function and transformed examples to orchestrator desired locations. + + ## Providing a preprocessing function + The TFX executor will use the estimator provided in the `module_file` file + to train the model. The Transform executor will look specifically for the + `preprocessing_fn()` function within that file. + + An example of `preprocessing_fn()` can be found in the [user-supplied + code]((https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py)) + of the TFX Chicago Taxi pipeline example. + + Args: + input_data: A Channel of 'Examples' type (required). This should + contain the two splits 'train' and 'eval'. + #examples: Forwards compatibility alias for the 'input_data' argument. + schema: A Channel of 'SchemaPath' type. This should contain a single + schema artifact. + module_file: The file path to a python module file, from which the + 'preprocessing_fn' function will be loaded. The function must have the + following signature. + + def preprocessing_fn(inputs: Dict[Text, Any]) -> Dict[Text, Any]: + ... + + where the values of input and returned Dict are either tf.Tensor or + tf.SparseTensor. Exactly one of 'module_file' or 'preprocessing_fn' + must be supplied. + preprocessing_fn: The path to python function that implements a + 'preprocessing_fn'. See 'module_file' for expected signature of the + function. Exactly one of 'module_file' or 'preprocessing_fn' must + be supplied. + + Returns: + transform_output: Optional output 'TransformPath' channel for output of + 'tf.Transform', which includes an exported Tensorflow graph suitable for + both training and serving; + transformed_examples: Optional output 'ExamplesPath' channel for + materialized transformed examples, which includes both 'train' and + 'eval' splits. + + Raises: + ValueError: When both or neither of 'module_file' and 'preprocessing_fn' + is supplied. + """ + from tfx.components.transform.component import Transform + component_class = Transform + input_channels_with_splits = {'input_data', 'examples'} + output_channels_with_splits = {'transformed_examples'} + + + import json + import os + import tfx + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + + +if __name__ == '__main__': + import kfp + kfp.components.func_to_container_op( + Transform, + base_image='tensorflow/tfx:0.15.0', + output_component_file='component.yaml' + ) diff --git a/components/tfx/Transform/component.yaml b/components/tfx/Transform/component.yaml new file mode 100644 index 00000000000..b3302d14387 --- /dev/null +++ b/components/tfx/Transform/component.yaml @@ -0,0 +1,264 @@ +name: Transform +description: | + A TFX component to transform the input examples. + + The Transform component wraps TensorFlow Transform (tf.Transform) to + preprocess data in a TFX pipeline. This component will load the + preprocessing_fn from input module file, preprocess both 'train' and 'eval' + splits of input examples, generate the `tf.Transform` output, and save both + transform function and transformed examples to orchestrator desired locations. + + ## Providing a preprocessing function + The TFX executor will use the estimator provided in the `module_file` file + to train the model. The Transform executor will look specifically for the + `preprocessing_fn()` function within that file. + + An example of `preprocessing_fn()` can be found in the [user-supplied + code]((https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py)) + of the TFX Chicago Taxi pipeline example. + + Args: + input_data: A Channel of 'Examples' type (required). This should + contain the two splits 'train' and 'eval'. + #examples: Forwards compatibility alias for the 'input_data' argument. + schema: A Channel of 'SchemaPath' type. This should contain a single + schema artifact. + module_file: The file path to a python module file, from which the + 'preprocessing_fn' function will be loaded. The function must have the + following signature. + + def preprocessing_fn(inputs: Dict[Text, Any]) -> Dict[Text, Any]: + ... + + where the values of input and returned Dict are either tf.Tensor or + tf.SparseTensor. Exactly one of 'module_file' or 'preprocessing_fn' + must be supplied. + preprocessing_fn: The path to python function that implements a + 'preprocessing_fn'. See 'module_file' for expected signature of the + function. Exactly one of 'module_file' or 'preprocessing_fn' must + be supplied. + + Returns: + transform_output: Optional output 'TransformPath' channel for output of + 'tf.Transform', which includes an exported Tensorflow graph suitable for + both training and serving; + transformed_examples: Optional output 'ExamplesPath' channel for + materialized transformed examples, which includes both 'train' and + 'eval' splits. + + Raises: + ValueError: When both or neither of 'module_file' and 'preprocessing_fn' + is supplied. +inputs: +- name: input_data + type: Examples +- name: schema + type: Schema +- name: module_file + type: Uri + optional: true +- name: preprocessing_fn + type: String + optional: true +outputs: +- name: transform_output + type: TransformGraph +- name: transformed_examples + type: Examples +implementation: + container: + image: tensorflow/tfx:0.15.0 + command: + - python3 + - -u + - -c + - | + class OutputPath: + '''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' + def __init__(self, type=None): + self.type = type + + class InputPath: + '''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' + def __init__(self, type=None): + self.type = type + + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def Transform( + input_data_path: InputPath('Examples'), + #examples: InputPath('Examples'), + schema_path: InputPath('Schema'), + + transform_output_path: OutputPath('TransformGraph'), + #transform_graph_path: OutputPath('TransformGraph'), + transformed_examples_path: OutputPath('Examples'), + + module_file: 'Uri' = None, + preprocessing_fn: str = None, + ): + """A TFX component to transform the input examples. + + The Transform component wraps TensorFlow Transform (tf.Transform) to + preprocess data in a TFX pipeline. This component will load the + preprocessing_fn from input module file, preprocess both 'train' and 'eval' + splits of input examples, generate the `tf.Transform` output, and save both + transform function and transformed examples to orchestrator desired locations. + + ## Providing a preprocessing function + The TFX executor will use the estimator provided in the `module_file` file + to train the model. The Transform executor will look specifically for the + `preprocessing_fn()` function within that file. + + An example of `preprocessing_fn()` can be found in the [user-supplied + code]((https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py)) + of the TFX Chicago Taxi pipeline example. + + Args: + input_data: A Channel of 'Examples' type (required). This should + contain the two splits 'train' and 'eval'. + #examples: Forwards compatibility alias for the 'input_data' argument. + schema: A Channel of 'SchemaPath' type. This should contain a single + schema artifact. + module_file: The file path to a python module file, from which the + 'preprocessing_fn' function will be loaded. The function must have the + following signature. + + def preprocessing_fn(inputs: Dict[Text, Any]) -> Dict[Text, Any]: + ... + + where the values of input and returned Dict are either tf.Tensor or + tf.SparseTensor. Exactly one of 'module_file' or 'preprocessing_fn' + must be supplied. + preprocessing_fn: The path to python function that implements a + 'preprocessing_fn'. See 'module_file' for expected signature of the + function. Exactly one of 'module_file' or 'preprocessing_fn' must + be supplied. + + Returns: + transform_output: Optional output 'TransformPath' channel for output of + 'tf.Transform', which includes an exported Tensorflow graph suitable for + both training and serving; + transformed_examples: Optional output 'ExamplesPath' channel for + materialized transformed examples, which includes both 'train' and + 'eval' splits. + + Raises: + ValueError: When both or neither of 'module_file' and 'preprocessing_fn' + is supplied. + """ + from tfx.components.transform.component import Transform + component_class = Transform + input_channels_with_splits = {'input_data', 'examples'} + output_channels_with_splits = {'transformed_examples'} + + import json + import os + from google.protobuf import json_format, message + from tfx.types import Artifact, channel_utils + + arguments = locals().copy() + + component_class_args = {} + + for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): + argument_value_obj = argument_value = arguments.get(name, None) + if argument_value is None: + continue + parameter_type = execution_parameter.type + if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple + argument_value_obj = parameter_type() + json_format.Parse(argument_value, argument_value_obj) + component_class_args[name] = argument_value_obj + + for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): + artifact_path = arguments[name + '_path'] + artifacts = [] + if name in input_channels_with_splits: + # Recovering splits + splits = sorted(os.listdir(artifact_path)) + for split in splits: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.split = split + artifact.uri = os.path.join(artifact_path, split) + '/' + artifacts.append(artifact) + else: + artifact = Artifact(type_name=channel_parameter.type_name) + artifact.uri = artifact_path + '/' # ? + artifacts.append(artifact) + component_class_args[name] = channel_utils.as_channel(artifacts) + + component_class_instance = component_class(**component_class_args) + + input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} + output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} + exec_properties = component_class_instance.exec_properties + + # Generating paths for output artifacts + for name, artifacts in output_dict.items(): + base_artifact_path = arguments[name + '_path'] + for artifact in artifacts: + artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' + + print('component instance: ' + str(component_class_instance)) + + #executor = component_class.EXECUTOR_SPEC.executor_class() # Same + executor = component_class_instance.executor_spec.executor_class() + executor.Do( + input_dict=input_dict, + output_dict=output_dict, + exec_properties=exec_properties, + ) + + import argparse + _parser = argparse.ArgumentParser(prog='Transform', description="A TFX component to transform the input examples.\n\n The Transform component wraps TensorFlow Transform (tf.Transform) to\n preprocess data in a TFX pipeline. This component will load the\n preprocessing_fn from input module file, preprocess both 'train' and 'eval'\n splits of input examples, generate the `tf.Transform` output, and save both\n transform function and transformed examples to orchestrator desired locations.\n\n ## Providing a preprocessing function\n The TFX executor will use the estimator provided in the `module_file` file\n to train the model. The Transform executor will look specifically for the\n `preprocessing_fn()` function within that file.\n\n An example of `preprocessing_fn()` can be found in the [user-supplied\n code]((https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py))\n of the TFX Chicago Taxi pipeline example.\n\n Args:\n input_data: A Channel of 'Examples' type (required). This should\n contain the two splits 'train' and 'eval'.\n #examples: Forwards compatibility alias for the 'input_data' argument.\n schema: A Channel of 'SchemaPath' type. This should contain a single\n schema artifact.\n module_file: The file path to a python module file, from which the\n 'preprocessing_fn' function will be loaded. The function must have the\n following signature.\n\n def preprocessing_fn(inputs: Dict[Text, Any]) -> Dict[Text, Any]:\n ...\n\n where the values of input and returned Dict are either tf.Tensor or\n tf.SparseTensor. Exactly one of 'module_file' or 'preprocessing_fn'\n must be supplied.\n preprocessing_fn: The path to python function that implements a\n 'preprocessing_fn'. See 'module_file' for expected signature of the\n function. Exactly one of 'module_file' or 'preprocessing_fn' must\n be supplied.\n\n Returns:\n transform_output: Optional output 'TransformPath' channel for output of\n 'tf.Transform', which includes an exported Tensorflow graph suitable for\n both training and serving;\n transformed_examples: Optional output 'ExamplesPath' channel for\n materialized transformed examples, which includes both 'train' and\n 'eval' splits.\n\n Raises:\n ValueError: When both or neither of 'module_file' and 'preprocessing_fn'\n is supplied.\n") + _parser.add_argument("--input-data", dest="input_data_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--schema", dest="schema_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--module-file", dest="module_file", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--preprocessing-fn", dest="preprocessing_fn", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("--transform-output", dest="transform_output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--transformed-examples", dest="transformed_examples_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = Transform(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --input-data + - inputPath: input_data + - --schema + - inputPath: schema + - if: + cond: + isPresent: module_file + then: + - --module-file + - inputValue: module_file + - if: + cond: + isPresent: preprocessing_fn + then: + - --preprocessing-fn + - inputValue: preprocessing_fn + - --transform-output + - outputPath: transform_output + - --transformed-examples + - outputPath: transformed_examples diff --git a/components/tfx/_samples/TFX_pipeline.ipynb b/components/tfx/_samples/TFX_pipeline.ipynb new file mode 100644 index 00000000000..f3e63dbf251 --- /dev/null +++ b/components/tfx/_samples/TFX_pipeline.ipynb @@ -0,0 +1,162 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### TFX Components\n", + "\n", + "This notebook shows how to create pipeline that uses TFX components:\n", + "\n", + "* CsvExampleGen\n", + "* StatisticsGen\n", + "* SchemaGen\n", + "* ExampleValidator\n", + "* Transform\n", + "* Trainer\n", + "* Evaluator" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp\n", + "\n", + "# Initializing the client\n", + "client = kfp.Client()\n", + "\n", + "# ! Use kfp.Client(host='https://xxxxx.notebooks.googleusercontent.com/') if working from GCP notebooks (or local notebooks)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "input_data_uri = 'gs:///tensorflow-tfx/tfx/components/testdata/external/csv'\n", + "\n", + "#Only S3/GCS is supported. Replace with downloading component or GIT clone\n", + "#module_file = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py'\n", + "module_file = 'gs:///tensorflow-tfx/tfx/examples/chicago_taxi_pipeline/taxi_utils.py'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "from kfp.components import load_component_from_url\n", + "\n", + "download_from_gcs_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d013b8535666641ca5a5be6ce67e69e044bbf076/components/google-cloud/storage/download/component.yaml')\n", + "\n", + "CsvExampleGen_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/025c424a/components/tfx/ExampleGen/CsvExampleGen/component.yaml')\n", + "StatisticsGen_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/025c424a/components/tfx/StatisticsGen/component.yaml')\n", + "SchemaGen_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/025c424a/components/tfx/SchemaGen/component.yaml')\n", + "ExampleValidator_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/025c424a/components/tfx/ExampleValidator/component.yaml')\n", + "Transform_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/025c424a/components/tfx/Transform/component.yaml')\n", + "Trainer_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/025c424a/components/tfx/Trainer/component.yaml')\n", + "Evaluator_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/025c424a/components/tfx/Evaluator/component.yaml')\n", + "\n", + "def tfx_pipeline(\n", + " input_data_uri,\n", + "):\n", + " download_task = download_from_gcs_op(\n", + " input_data_uri,\n", + " )\n", + "\n", + " examples_task = CsvExampleGen_op(\n", + " input_base=download_task.output,\n", + " input_config=json.dumps({\n", + " \"splits\": [\n", + " {'name': 'data', 'pattern': '*.csv'},\n", + " ]\n", + " }),\n", + " output_config=json.dumps({\n", + " \"splitConfig\": {\n", + " \"splits\": [\n", + " {'name': 'train', 'hash_buckets': 2},\n", + " {'name': 'eval', 'hash_buckets': 1},\n", + " ]\n", + " }\n", + " }),\n", + " )\n", + " \n", + " statistics_task = StatisticsGen_op(\n", + " examples_task.output,\n", + " )\n", + " \n", + " schema_task = SchemaGen_op(\n", + " statistics_task.output,\n", + " )\n", + "\n", + " # Performs anomaly detection based on statistics and data schema.\n", + " validator_task = ExampleValidator_op(\n", + " stats=statistics_task.outputs['output'],\n", + " schema=schema_task.outputs['output'],\n", + " )\n", + "\n", + " # Performs transformations and feature engineering in training and serving.\n", + " transform_task = Transform_op(\n", + " input_data=examples_task.outputs['example_artifacts'],\n", + " schema=schema_task.outputs['output'],\n", + " module_file=module_file,\n", + " )\n", + "\n", + " trainer_task = Trainer_op(\n", + " module_file=module_file,\n", + " examples=transform_task.outputs['transformed_examples'],\n", + " schema=schema_task.outputs['output'],\n", + " transform_output=transform_task.outputs['transform_output'],\n", + " train_args=json.dumps({'num_steps': 10000}),\n", + " eval_args=json.dumps({'num_steps': 5000}),\n", + " )\n", + "\n", + " # Uses TFMA to compute a evaluation statistics over features of a model.\n", + " model_analyzer = Evaluator_op(\n", + " examples=examples_task.outputs['example_artifacts'],\n", + " model_exports=trainer_task.outputs['output'],\n", + " feature_slicing_spec=json.dumps({\n", + " 'specs': [\n", + " {'column_for_slicing': ['trip_start_hour']},\n", + " ],\n", + " }),\n", + " )\n", + "\n", + "\n", + "client.create_run_from_pipeline_func(\n", + " tfx_pipeline,\n", + " arguments=dict(\n", + " input_data_uri=input_data_uri,\n", + " ),\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.5.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}