From 354365705e6946db37b7794f38708e446325aad2 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 28 May 2020 20:17:15 -0700 Subject: [PATCH] Components - Apache Parquet converters (#3834) * Components - Apache Parquet converters Added components that convert to and from Apache Parquet data format * Added sample pipeline --- .../ApacheParquet/_samples/sample_pipeline.py | 37 +++++++++ .../from_ApacheArrowFeather/component.py | 27 +++++++ .../from_ApacheArrowFeather/component.yaml | 74 ++++++++++++++++++ .../ApacheParquet/from_CSV/component.py | 26 +++++++ .../ApacheParquet/from_CSV/component.yaml | 72 ++++++++++++++++++ .../ApacheParquet/from_TSV/component.py | 26 +++++++ .../ApacheParquet/from_TSV/component.yaml | 72 ++++++++++++++++++ .../to_ApacheArrowFeather/component.py | 27 +++++++ .../to_ApacheArrowFeather/component.yaml | 75 +++++++++++++++++++ 9 files changed, 436 insertions(+) create mode 100644 components/_converters/ApacheParquet/_samples/sample_pipeline.py create mode 100644 components/_converters/ApacheParquet/from_ApacheArrowFeather/component.py create mode 100644 components/_converters/ApacheParquet/from_ApacheArrowFeather/component.yaml create mode 100644 components/_converters/ApacheParquet/from_CSV/component.py create mode 100644 components/_converters/ApacheParquet/from_CSV/component.yaml create mode 100644 components/_converters/ApacheParquet/from_TSV/component.py create mode 100644 components/_converters/ApacheParquet/from_TSV/component.yaml create mode 100644 components/_converters/ApacheParquet/to_ApacheArrowFeather/component.py create mode 100644 components/_converters/ApacheParquet/to_ApacheArrowFeather/component.yaml diff --git a/components/_converters/ApacheParquet/_samples/sample_pipeline.py b/components/_converters/ApacheParquet/_samples/sample_pipeline.py new file mode 100644 index 000000000000..6aa283ae1c45 --- /dev/null +++ b/components/_converters/ApacheParquet/_samples/sample_pipeline.py @@ -0,0 +1,37 @@ +import kfp +from kfp import components + +component_store = components.ComponentStore(url_search_prefixes=['https://raw.githubusercontent.com/kubeflow/pipelines/0d7d6f41c92bdc05c2825232afe2b47e5cb6c4b3/components/']) + +chicago_taxi_dataset_op = component_store.load_component(name='datasets/Chicago_Taxi_Trips') +convert_csv_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_CSV') +convert_tsv_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_TSV') +convert_apache_parquet_to_apache_arrow_feather_op = component_store.load_component(name='_converters/ApacheParquet/to_ApacheArrowFeather') +convert_apache_arrow_feather_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_ApacheArrowFeather') + + +def parquet_pipeline(): + csv = chicago_taxi_dataset_op( + where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"', + select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total', + limit=10000, + ).output + + tsv = chicago_taxi_dataset_op( + where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"', + select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total', + limit=10000, + format='tsv', + ).output + + csv_parquet = convert_csv_to_apache_parquet_op(csv).output + csv_parquet_feather = convert_apache_parquet_to_apache_arrow_feather_op(csv_parquet).output + csv_parquet_feather_parquet = convert_apache_arrow_feather_to_apache_parquet_op(csv_parquet_feather).output + + tsv_parquet = convert_tsv_to_apache_parquet_op(tsv).output + tsv_parquet_feather = convert_apache_parquet_to_apache_arrow_feather_op(tsv_parquet).output + tsv_parquet_feather_parquet = convert_apache_arrow_feather_to_apache_parquet_op(tsv_parquet_feather).output + +if __name__ == '__main__': + kfp_endpoint = None + kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(parquet_pipeline, arguments={}) diff --git a/components/_converters/ApacheParquet/from_ApacheArrowFeather/component.py b/components/_converters/ApacheParquet/from_ApacheArrowFeather/component.py new file mode 100644 index 000000000000..a6949b7ce3da --- /dev/null +++ b/components/_converters/ApacheParquet/from_ApacheArrowFeather/component.py @@ -0,0 +1,27 @@ +from kfp.components import InputPath, OutputPath, create_component_from_func + +def convert_apache_arrow_feather_to_apache_parquet( + data_path: InputPath('ApacheArrowFeather'), + output_data_path: OutputPath('ApacheParquet'), +): + '''Converts Apache Arrow Feather to Apache Parquet. + + [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov + ''' + from pyarrow import feather, parquet + + table = feather.read_table(data_path) + parquet.write_table(table, output_data_path) + + +if __name__ == '__main__': + create_component_from_func( + convert_apache_arrow_feather_to_apache_parquet, + output_component_file='component.yaml', + base_image='python:3.7', + packages_to_install=['pyarrow==0.17.1'] + ) diff --git a/components/_converters/ApacheParquet/from_ApacheArrowFeather/component.yaml b/components/_converters/ApacheParquet/from_ApacheArrowFeather/component.yaml new file mode 100644 index 000000000000..5c14ed7772df --- /dev/null +++ b/components/_converters/ApacheParquet/from_ApacheArrowFeather/component.yaml @@ -0,0 +1,74 @@ +name: Convert apache arrow feather to apache parquet +description: |- + Converts Apache Arrow Feather to Apache Parquet. + + [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov +inputs: +- {name: data, type: ApacheArrowFeather} +outputs: +- {name: output_data, type: ApacheParquet} +implementation: + container: + image: python:3.7 + command: + - sh + - -c + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location + 'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install + --quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@" + - python3 + - -u + - -c + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def convert_apache_arrow_feather_to_apache_parquet( + data_path, + output_data_path, + ): + '''Converts Apache Arrow Feather to Apache Parquet. + + [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov + ''' + from pyarrow import feather, parquet + + table = feather.read_table(data_path) + parquet.write_table(table, output_data_path) + + import argparse + _parser = argparse.ArgumentParser(prog='Convert apache arrow feather to apache parquet', description='Converts Apache Arrow Feather to Apache Parquet.\n\n [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html)\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov ') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = convert_apache_arrow_feather_to_apache_parquet(**_parsed_args) + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --data + - {inputPath: data} + - --output-data + - {outputPath: output_data} diff --git a/components/_converters/ApacheParquet/from_CSV/component.py b/components/_converters/ApacheParquet/from_CSV/component.py new file mode 100644 index 000000000000..101aa78311e7 --- /dev/null +++ b/components/_converters/ApacheParquet/from_CSV/component.py @@ -0,0 +1,26 @@ +from kfp.components import InputPath, OutputPath, create_component_from_func + +def convert_csv_to_apache_parquet( + data_path: InputPath('CSV'), + output_data_path: OutputPath('ApacheParquet'), +): + '''Converts CSV table to Apache Parquet. + + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov + ''' + from pyarrow import csv, parquet + + table = csv.read_csv(data_path) + parquet.write_table(table, output_data_path) + + +if __name__ == '__main__': + create_component_from_func( + convert_csv_to_apache_parquet, + output_component_file='component.yaml', + base_image='python:3.7', + packages_to_install=['pyarrow==0.17.1'] + ) diff --git a/components/_converters/ApacheParquet/from_CSV/component.yaml b/components/_converters/ApacheParquet/from_CSV/component.yaml new file mode 100644 index 000000000000..bb3ac32e2f6b --- /dev/null +++ b/components/_converters/ApacheParquet/from_CSV/component.yaml @@ -0,0 +1,72 @@ +name: Convert csv to apache parquet +description: |- + Converts CSV table to Apache Parquet. + + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov +inputs: +- {name: data, type: CSV} +outputs: +- {name: output_data, type: ApacheParquet} +implementation: + container: + image: python:3.7 + command: + - sh + - -c + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location + 'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install + --quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@" + - python3 + - -u + - -c + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def convert_csv_to_apache_parquet( + data_path, + output_data_path, + ): + '''Converts CSV table to Apache Parquet. + + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov + ''' + from pyarrow import csv, parquet + + table = csv.read_csv(data_path) + parquet.write_table(table, output_data_path) + + import argparse + _parser = argparse.ArgumentParser(prog='Convert csv to apache parquet', description='Converts CSV table to Apache Parquet.\n\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov ') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = convert_csv_to_apache_parquet(**_parsed_args) + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --data + - {inputPath: data} + - --output-data + - {outputPath: output_data} diff --git a/components/_converters/ApacheParquet/from_TSV/component.py b/components/_converters/ApacheParquet/from_TSV/component.py new file mode 100644 index 000000000000..d297171a93ae --- /dev/null +++ b/components/_converters/ApacheParquet/from_TSV/component.py @@ -0,0 +1,26 @@ +from kfp.components import InputPath, OutputPath, create_component_from_func + +def convert_tsv_to_apache_parquet( + data_path: InputPath('TSV'), + output_data_path: OutputPath('ApacheParquet'), +): + '''Converts TSV table to Apache Parquet. + + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov + ''' + from pyarrow import csv, parquet + + table = csv.read_csv(data_path, parse_options=csv.ParseOptions(delimiter='\t')) + parquet.write_table(table, output_data_path) + + +if __name__ == '__main__': + create_component_from_func( + convert_tsv_to_apache_parquet, + output_component_file='component.yaml', + base_image='python:3.7', + packages_to_install=['pyarrow==0.17.1'] + ) diff --git a/components/_converters/ApacheParquet/from_TSV/component.yaml b/components/_converters/ApacheParquet/from_TSV/component.yaml new file mode 100644 index 000000000000..499370281c32 --- /dev/null +++ b/components/_converters/ApacheParquet/from_TSV/component.yaml @@ -0,0 +1,72 @@ +name: Convert tsv to apache parquet +description: |- + Converts TSV table to Apache Parquet. + + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov +inputs: +- {name: data, type: TSV} +outputs: +- {name: output_data, type: ApacheParquet} +implementation: + container: + image: python:3.7 + command: + - sh + - -c + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location + 'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install + --quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@" + - python3 + - -u + - -c + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def convert_tsv_to_apache_parquet( + data_path, + output_data_path, + ): + '''Converts TSV table to Apache Parquet. + + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov + ''' + from pyarrow import csv, parquet + + table = csv.read_csv(data_path, parse_options=csv.ParseOptions(delimiter='\t')) + parquet.write_table(table, output_data_path) + + import argparse + _parser = argparse.ArgumentParser(prog='Convert tsv to apache parquet', description='Converts TSV table to Apache Parquet.\n\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov ') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = convert_tsv_to_apache_parquet(**_parsed_args) + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --data + - {inputPath: data} + - --output-data + - {outputPath: output_data} diff --git a/components/_converters/ApacheParquet/to_ApacheArrowFeather/component.py b/components/_converters/ApacheParquet/to_ApacheArrowFeather/component.py new file mode 100644 index 000000000000..0129334ba7f9 --- /dev/null +++ b/components/_converters/ApacheParquet/to_ApacheArrowFeather/component.py @@ -0,0 +1,27 @@ +from kfp.components import InputPath, OutputPath, create_component_from_func + +def convert_apache_parquet_to_apache_arrow_feather( + data_path: InputPath('ApacheParquet'), + output_data_path: OutputPath('ApacheArrowFeather'), +): + '''Converts Apache Parquet to Apache Arrow Feather. + + [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov + ''' + from pyarrow import feather, parquet + + data_frame = parquet.read_pandas(data_path).to_pandas() + feather.write_feather(data_frame, output_data_path) + + +if __name__ == '__main__': + convert_apache_parquet_to_apache_arrow_feather_op = create_component_from_func( + convert_apache_parquet_to_apache_arrow_feather, + output_component_file='component.yaml', + base_image='python:3.7', + packages_to_install=['pyarrow==0.17.1', 'pandas==1.0.3'] + ) diff --git a/components/_converters/ApacheParquet/to_ApacheArrowFeather/component.yaml b/components/_converters/ApacheParquet/to_ApacheArrowFeather/component.yaml new file mode 100644 index 000000000000..28f64056da3d --- /dev/null +++ b/components/_converters/ApacheParquet/to_ApacheArrowFeather/component.yaml @@ -0,0 +1,75 @@ +name: Convert apache parquet to apache arrow feather +description: |- + Converts Apache Parquet to Apache Arrow Feather. + + [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov +inputs: +- {name: data, type: ApacheParquet} +outputs: +- {name: output_data, type: ApacheArrowFeather} +implementation: + container: + image: python:3.7 + command: + - sh + - -c + - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location + 'pyarrow==0.17.1' 'pandas==1.0.3' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 + -m pip install --quiet --no-warn-script-location 'pyarrow==0.17.1' 'pandas==1.0.3' + --user) && "$0" "$@" + - python3 + - -u + - -c + - | + def _make_parent_dirs_and_return_path(file_path: str): + import os + os.makedirs(os.path.dirname(file_path), exist_ok=True) + return file_path + + def convert_apache_parquet_to_apache_arrow_feather( + data_path, + output_data_path, + ): + '''Converts Apache Parquet to Apache Arrow Feather. + + [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) + [Apache Parquet](https://parquet.apache.org/) + + Annotations: + author: Alexey Volkov + ''' + from pyarrow import feather, parquet + + data_frame = parquet.read_pandas(data_path).to_pandas() + feather.write_feather(data_frame, output_data_path) + + import argparse + _parser = argparse.ArgumentParser(prog='Convert apache parquet to apache arrow feather', description='Converts Apache Parquet to Apache Arrow Feather.\n\n [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html)\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov ') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = convert_apache_parquet_to_apache_arrow_feather(**_parsed_args) + + _output_serializers = [ + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --data + - {inputPath: data} + - --output-data + - {outputPath: output_data}