forked from kubeflow/pipelines
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Components - Apache Parquet converters (kubeflow#3834)
* Components - Apache Parquet converters Added components that convert to and from Apache Parquet data format * Added sample pipeline
- Loading branch information
Showing
9 changed files
with
436 additions
and
0 deletions.
There are no files selected for viewing
37 changes: 37 additions & 0 deletions
37
components/_converters/ApacheParquet/_samples/sample_pipeline.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import kfp | ||
from kfp import components | ||
|
||
component_store = components.ComponentStore(url_search_prefixes=['https://raw.githubusercontent.com/kubeflow/pipelines/0d7d6f41c92bdc05c2825232afe2b47e5cb6c4b3/components/']) | ||
|
||
chicago_taxi_dataset_op = component_store.load_component(name='datasets/Chicago_Taxi_Trips') | ||
convert_csv_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_CSV') | ||
convert_tsv_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_TSV') | ||
convert_apache_parquet_to_apache_arrow_feather_op = component_store.load_component(name='_converters/ApacheParquet/to_ApacheArrowFeather') | ||
convert_apache_arrow_feather_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_ApacheArrowFeather') | ||
|
||
|
||
def parquet_pipeline(): | ||
csv = chicago_taxi_dataset_op( | ||
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"', | ||
select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total', | ||
limit=10000, | ||
).output | ||
|
||
tsv = chicago_taxi_dataset_op( | ||
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"', | ||
select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total', | ||
limit=10000, | ||
format='tsv', | ||
).output | ||
|
||
csv_parquet = convert_csv_to_apache_parquet_op(csv).output | ||
csv_parquet_feather = convert_apache_parquet_to_apache_arrow_feather_op(csv_parquet).output | ||
csv_parquet_feather_parquet = convert_apache_arrow_feather_to_apache_parquet_op(csv_parquet_feather).output | ||
|
||
tsv_parquet = convert_tsv_to_apache_parquet_op(tsv).output | ||
tsv_parquet_feather = convert_apache_parquet_to_apache_arrow_feather_op(tsv_parquet).output | ||
tsv_parquet_feather_parquet = convert_apache_arrow_feather_to_apache_parquet_op(tsv_parquet_feather).output | ||
|
||
if __name__ == '__main__': | ||
kfp_endpoint = None | ||
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(parquet_pipeline, arguments={}) |
27 changes: 27 additions & 0 deletions
27
components/_converters/ApacheParquet/from_ApacheArrowFeather/component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from kfp.components import InputPath, OutputPath, create_component_from_func | ||
|
||
def convert_apache_arrow_feather_to_apache_parquet( | ||
data_path: InputPath('ApacheArrowFeather'), | ||
output_data_path: OutputPath('ApacheParquet'), | ||
): | ||
'''Converts Apache Arrow Feather to Apache Parquet. | ||
[Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
''' | ||
from pyarrow import feather, parquet | ||
|
||
table = feather.read_table(data_path) | ||
parquet.write_table(table, output_data_path) | ||
|
||
|
||
if __name__ == '__main__': | ||
create_component_from_func( | ||
convert_apache_arrow_feather_to_apache_parquet, | ||
output_component_file='component.yaml', | ||
base_image='python:3.7', | ||
packages_to_install=['pyarrow==0.17.1'] | ||
) |
74 changes: 74 additions & 0 deletions
74
components/_converters/ApacheParquet/from_ApacheArrowFeather/component.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
name: Convert apache arrow feather to apache parquet | ||
description: |- | ||
Converts Apache Arrow Feather to Apache Parquet. | ||
[Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
inputs: | ||
- {name: data, type: ApacheArrowFeather} | ||
outputs: | ||
- {name: output_data, type: ApacheParquet} | ||
implementation: | ||
container: | ||
image: python:3.7 | ||
command: | ||
- sh | ||
- -c | ||
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | ||
'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install | ||
--quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@" | ||
- python3 | ||
- -u | ||
- -c | ||
- | | ||
def _make_parent_dirs_and_return_path(file_path: str): | ||
import os | ||
os.makedirs(os.path.dirname(file_path), exist_ok=True) | ||
return file_path | ||
def convert_apache_arrow_feather_to_apache_parquet( | ||
data_path, | ||
output_data_path, | ||
): | ||
'''Converts Apache Arrow Feather to Apache Parquet. | ||
[Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
''' | ||
from pyarrow import feather, parquet | ||
table = feather.read_table(data_path) | ||
parquet.write_table(table, output_data_path) | ||
import argparse | ||
_parser = argparse.ArgumentParser(prog='Convert apache arrow feather to apache parquet', description='Converts Apache Arrow Feather to Apache Parquet.\n\n [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html)\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>') | ||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) | ||
_parsed_args = vars(_parser.parse_args()) | ||
_output_files = _parsed_args.pop("_output_paths", []) | ||
_outputs = convert_apache_arrow_feather_to_apache_parquet(**_parsed_args) | ||
_output_serializers = [ | ||
] | ||
import os | ||
for idx, output_file in enumerate(_output_files): | ||
try: | ||
os.makedirs(os.path.dirname(output_file)) | ||
except OSError: | ||
pass | ||
with open(output_file, 'w') as f: | ||
f.write(_output_serializers[idx](_outputs[idx])) | ||
args: | ||
- --data | ||
- {inputPath: data} | ||
- --output-data | ||
- {outputPath: output_data} |
26 changes: 26 additions & 0 deletions
26
components/_converters/ApacheParquet/from_CSV/component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from kfp.components import InputPath, OutputPath, create_component_from_func | ||
|
||
def convert_csv_to_apache_parquet( | ||
data_path: InputPath('CSV'), | ||
output_data_path: OutputPath('ApacheParquet'), | ||
): | ||
'''Converts CSV table to Apache Parquet. | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
''' | ||
from pyarrow import csv, parquet | ||
|
||
table = csv.read_csv(data_path) | ||
parquet.write_table(table, output_data_path) | ||
|
||
|
||
if __name__ == '__main__': | ||
create_component_from_func( | ||
convert_csv_to_apache_parquet, | ||
output_component_file='component.yaml', | ||
base_image='python:3.7', | ||
packages_to_install=['pyarrow==0.17.1'] | ||
) |
72 changes: 72 additions & 0 deletions
72
components/_converters/ApacheParquet/from_CSV/component.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
name: Convert csv to apache parquet | ||
description: |- | ||
Converts CSV table to Apache Parquet. | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
inputs: | ||
- {name: data, type: CSV} | ||
outputs: | ||
- {name: output_data, type: ApacheParquet} | ||
implementation: | ||
container: | ||
image: python:3.7 | ||
command: | ||
- sh | ||
- -c | ||
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | ||
'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install | ||
--quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@" | ||
- python3 | ||
- -u | ||
- -c | ||
- | | ||
def _make_parent_dirs_and_return_path(file_path: str): | ||
import os | ||
os.makedirs(os.path.dirname(file_path), exist_ok=True) | ||
return file_path | ||
def convert_csv_to_apache_parquet( | ||
data_path, | ||
output_data_path, | ||
): | ||
'''Converts CSV table to Apache Parquet. | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
''' | ||
from pyarrow import csv, parquet | ||
table = csv.read_csv(data_path) | ||
parquet.write_table(table, output_data_path) | ||
import argparse | ||
_parser = argparse.ArgumentParser(prog='Convert csv to apache parquet', description='Converts CSV table to Apache Parquet.\n\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>') | ||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) | ||
_parsed_args = vars(_parser.parse_args()) | ||
_output_files = _parsed_args.pop("_output_paths", []) | ||
_outputs = convert_csv_to_apache_parquet(**_parsed_args) | ||
_output_serializers = [ | ||
] | ||
import os | ||
for idx, output_file in enumerate(_output_files): | ||
try: | ||
os.makedirs(os.path.dirname(output_file)) | ||
except OSError: | ||
pass | ||
with open(output_file, 'w') as f: | ||
f.write(_output_serializers[idx](_outputs[idx])) | ||
args: | ||
- --data | ||
- {inputPath: data} | ||
- --output-data | ||
- {outputPath: output_data} |
26 changes: 26 additions & 0 deletions
26
components/_converters/ApacheParquet/from_TSV/component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from kfp.components import InputPath, OutputPath, create_component_from_func | ||
|
||
def convert_tsv_to_apache_parquet( | ||
data_path: InputPath('TSV'), | ||
output_data_path: OutputPath('ApacheParquet'), | ||
): | ||
'''Converts TSV table to Apache Parquet. | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
''' | ||
from pyarrow import csv, parquet | ||
|
||
table = csv.read_csv(data_path, parse_options=csv.ParseOptions(delimiter='\t')) | ||
parquet.write_table(table, output_data_path) | ||
|
||
|
||
if __name__ == '__main__': | ||
create_component_from_func( | ||
convert_tsv_to_apache_parquet, | ||
output_component_file='component.yaml', | ||
base_image='python:3.7', | ||
packages_to_install=['pyarrow==0.17.1'] | ||
) |
72 changes: 72 additions & 0 deletions
72
components/_converters/ApacheParquet/from_TSV/component.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
name: Convert tsv to apache parquet | ||
description: |- | ||
Converts TSV table to Apache Parquet. | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
inputs: | ||
- {name: data, type: TSV} | ||
outputs: | ||
- {name: output_data, type: ApacheParquet} | ||
implementation: | ||
container: | ||
image: python:3.7 | ||
command: | ||
- sh | ||
- -c | ||
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | ||
'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install | ||
--quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@" | ||
- python3 | ||
- -u | ||
- -c | ||
- | | ||
def _make_parent_dirs_and_return_path(file_path: str): | ||
import os | ||
os.makedirs(os.path.dirname(file_path), exist_ok=True) | ||
return file_path | ||
def convert_tsv_to_apache_parquet( | ||
data_path, | ||
output_data_path, | ||
): | ||
'''Converts TSV table to Apache Parquet. | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
''' | ||
from pyarrow import csv, parquet | ||
table = csv.read_csv(data_path, parse_options=csv.ParseOptions(delimiter='\t')) | ||
parquet.write_table(table, output_data_path) | ||
import argparse | ||
_parser = argparse.ArgumentParser(prog='Convert tsv to apache parquet', description='Converts TSV table to Apache Parquet.\n\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>') | ||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) | ||
_parsed_args = vars(_parser.parse_args()) | ||
_output_files = _parsed_args.pop("_output_paths", []) | ||
_outputs = convert_tsv_to_apache_parquet(**_parsed_args) | ||
_output_serializers = [ | ||
] | ||
import os | ||
for idx, output_file in enumerate(_output_files): | ||
try: | ||
os.makedirs(os.path.dirname(output_file)) | ||
except OSError: | ||
pass | ||
with open(output_file, 'w') as f: | ||
f.write(_output_serializers[idx](_outputs[idx])) | ||
args: | ||
- --data | ||
- {inputPath: data} | ||
- --output-data | ||
- {outputPath: output_data} |
27 changes: 27 additions & 0 deletions
27
components/_converters/ApacheParquet/to_ApacheArrowFeather/component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from kfp.components import InputPath, OutputPath, create_component_from_func | ||
|
||
def convert_apache_parquet_to_apache_arrow_feather( | ||
data_path: InputPath('ApacheParquet'), | ||
output_data_path: OutputPath('ApacheArrowFeather'), | ||
): | ||
'''Converts Apache Parquet to Apache Arrow Feather. | ||
[Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html) | ||
[Apache Parquet](https://parquet.apache.org/) | ||
Annotations: | ||
author: Alexey Volkov <alexey.volkov@ark-kun.com> | ||
''' | ||
from pyarrow import feather, parquet | ||
|
||
data_frame = parquet.read_pandas(data_path).to_pandas() | ||
feather.write_feather(data_frame, output_data_path) | ||
|
||
|
||
if __name__ == '__main__': | ||
convert_apache_parquet_to_apache_arrow_feather_op = create_component_from_func( | ||
convert_apache_parquet_to_apache_arrow_feather, | ||
output_component_file='component.yaml', | ||
base_image='python:3.7', | ||
packages_to_install=['pyarrow==0.17.1', 'pandas==1.0.3'] | ||
) |
Oops, something went wrong.