From 2d1c8e72faf6b8153b853808270d4dfde47811b5 Mon Sep 17 00:00:00 2001 From: Wei Ji Date: Fri, 21 Aug 2020 09:16:41 +1200 Subject: [PATCH] :sparkles: Ndarray to Parquet convenience function Building on top of eb61ff609a7fa12a419cebb35daf45f072fb8ad3, but for n-dimensional arrays, and writing the dataframe to Parquet too! This function might be a little too convenient (read: contains hardcoding), but it smooths out some of the rough edges in terms of PyData file format interoperability. Should contribute this somewhere upstream when I get the time. --- atlxi_dhdt.ipynb | 2 +- atlxi_dhdt.py | 2 +- deepicedrain/__init__.py | 2 +- deepicedrain/extraload.py | 78 ++++++++++- deepicedrain/tests/test_array_to_dataframe.py | 37 ------ .../tests/test_ndarray_to_dataframe.py | 121 ++++++++++++++++++ 6 files changed, 201 insertions(+), 41 deletions(-) delete mode 100644 deepicedrain/tests/test_array_to_dataframe.py create mode 100644 deepicedrain/tests/test_ndarray_to_dataframe.py diff --git a/atlxi_dhdt.ipynb b/atlxi_dhdt.ipynb index d05e0c1..6cceb72 100644 --- a/atlxi_dhdt.ipynb +++ b/atlxi_dhdt.ipynb @@ -904,7 +904,7 @@ " ds_subset: xr.Dataset = region.subset(data=ds_dhdt)\n", " # Add a UTC_time column to the dataframe\n", " ds_subset[\"utc_time\"] = deepicedrain.deltatime_to_utctime(\n", - " dataarray=ds_subset.delta_ds_subsettime\n", + " dataarray=ds_subset.delta_time\n", " )\n", " # Convert xarray.Dataset to pandas.DataFrame for easier analysis\n", " df_many: pd.DataFrame = ds_subset.to_dataframe().dropna()\n", diff --git a/atlxi_dhdt.py b/atlxi_dhdt.py index 0de4d0a..cdbea52 100644 --- a/atlxi_dhdt.py +++ b/atlxi_dhdt.py @@ -401,7 +401,7 @@ ds_subset: xr.Dataset = region.subset(data=ds_dhdt) # Add a UTC_time column to the dataframe ds_subset["utc_time"] = deepicedrain.deltatime_to_utctime( - dataarray=ds_subset.delta_ds_subsettime + dataarray=ds_subset.delta_time ) # Convert xarray.Dataset to pandas.DataFrame for easier analysis df_many: pd.DataFrame = ds_subset.to_dataframe().dropna() diff --git a/deepicedrain/__init__.py b/deepicedrain/__init__.py index c710f41..7284559 100644 --- a/deepicedrain/__init__.py +++ b/deepicedrain/__init__.py @@ -4,7 +4,7 @@ import deepicedrain import intake from deepicedrain.deltamath import calculate_delta, nan_linregress, nanptp -from deepicedrain.extraload import array_to_dataframe +from deepicedrain.extraload import array_to_dataframe, ndarray_to_parquet from deepicedrain.spatiotemporal import ( Region, deltatime_to_utctime, diff --git a/deepicedrain/extraload.py b/deepicedrain/extraload.py index 4e0e9e3..012621b 100644 --- a/deepicedrain/extraload.py +++ b/deepicedrain/extraload.py @@ -4,8 +4,13 @@ """ import pandas as pd +import dask +import zarr -def array_to_dataframe(array, colname: str = None, startcol: int = 0): + +def array_to_dataframe( + array: dask.array.core.Array, colname: str = None, startcol: int = 0 +): """ Converts a 1D or 2D data array into a tidy dataframe structure. An array of shape (m, n) will turn into a table with m rows and n columns. @@ -41,3 +46,74 @@ def array_to_dataframe(array, colname: str = None, startcol: int = 0): dataframe: pd.DataFrame = pd.DataFrame.from_records(data=array, columns=columns) return dataframe + + +def ndarray_to_parquet( + ndarray, + parquetpath: str, + variables: list = None, + dropnacols: list = None, + engine: str = "pyarrow", + **kwargs, +) -> pd.DataFrame: + """ + Converts an n-dimensional xarray Dataset or Zarr Array into an Apache + Parquet columnar file via an intermediate Dask/Pandas DataFrame format. + This is a convenience function that wraps around array_to_dataframe, + intended to make converting n number of arrays easier. + + Parameters + ---------- + ndarray : xarray.Dataset or zarr.hierarchy.Group + An n-dimensional array in xarray containing several coordinate and data + variables, or a Zarr array containing several variables. + parquetpath : str + Filepath to where the resulting parquet file will be stored. + variables : list + Name(s) of the variables/columns that will be stored to the parquet + file. If not provided, all the variables in the zarr group will be + stored. + dropnacols : list + Drop rows containing NaN values in these fields before saving to the + Parquet file. + engine : str + Parquet library to use. Choose from 'auto', 'fastparquet', 'pyarrow'. + Default is "pyarrow". + **kwargs : dict + Extra options to be passed on to pandas.DataFrame.to_parquet. + + Returns + ------- + point_labels : cudf.Series + A column of labels that indicates which polygon the points fall into. + + """ + if variables is None: + try: + variables = [varname for varname, _ in ndarray.arrays()] + except AttributeError: + variables = [c for c in ndarray.coords] + [d for d in ndarray.data_vars] + + if isinstance(ndarray, zarr.hierarchy.Group): + array_func = lambda varname: dask.array.from_zarr(ndarray[varname]) + else: + array_func = lambda varname: ndarray[varname].data + + dataframes: list = [ + array_to_dataframe(array=array_func(varname), colname=varname, startcol=1) + for varname in variables + ] + dataframe: dask.dataframe.core.DataFrame = dask.dataframe.concat( + dfs=dataframes, axis="columns" + ) + if dropnacols: + dataframe = dataframe.dropna(subset=dropnacols) + + # Convert to pandas DataFrame first before saving to a single binary + # parquet file, rather than going directly from a Dask DataFrame to a + # series of parquet files in a parquet folder. This ensures that cudf can + # read it later, see https://github.com/rapidsai/cudf/issues/1688 + df: pd.DataFrame = dataframe.compute() + df.to_parquet(path=parquetpath, engine=engine, **kwargs) + + return df diff --git a/deepicedrain/tests/test_array_to_dataframe.py b/deepicedrain/tests/test_array_to_dataframe.py deleted file mode 100644 index 344846a..0000000 --- a/deepicedrain/tests/test_array_to_dataframe.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -Tests the array_to_dataframe function -""" -import dask -import numpy as np -import pandas as pd -import pytest - -from deepicedrain import array_to_dataframe - - -@pytest.mark.parametrize("shape", [(10, 1), (10, 2)]) -def test_numpy_array_to_pandas_dataframe(shape): - """ - Test converting from a numpy.array to a pandas.Dataframe, and ensure that - the colname argument works. - """ - array: np.ndarray = np.ones(shape=shape) - dataframe = array_to_dataframe(array=array) - - assert isinstance(dataframe, pd.DataFrame) - assert len(dataframe.columns) == shape[1] - assert dataframe.columns.to_list() == [str(i) for i in range(shape[1])] - - -@pytest.mark.parametrize("shape", [(10, 1), (10, 2)]) -def test_dask_array_to_dask_dataframe(shape): - """ - Test converting from a dask.array to a dask.dataframe, and ensure that the - startcol argument works. - """ - array: dask.array.core.Array = dask.array.ones(shape=shape, name="varname") - dataframe = array_to_dataframe(array=array, startcol=1) - - assert isinstance(dataframe, dask.dataframe.core.DataFrame) - assert len(dataframe.columns) == shape[1] - assert dataframe.columns.to_list() == [f"varname_{i+1}" for i in range(shape[1])] diff --git a/deepicedrain/tests/test_ndarray_to_dataframe.py b/deepicedrain/tests/test_ndarray_to_dataframe.py new file mode 100644 index 0000000..308a4f3 --- /dev/null +++ b/deepicedrain/tests/test_ndarray_to_dataframe.py @@ -0,0 +1,121 @@ +""" +Tests various conversions from n-dimensional arrays to columnar dataframe table +structures. +""" +import os +import tempfile + +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +import dask +import zarr +from deepicedrain import array_to_dataframe, catalog, ndarray_to_parquet + + +@pytest.fixture(scope="module", name="dataset") +def fixture_dataset(): + """ + Load the sample ICESat-2 ATL11 data into an xarray, and clean it up to + allow saving to other formats like Zarr + """ + dataset: xr.Dataset = catalog.test_data.atl11_test_case.to_dask() + for key, variable in dataset.variables.items(): + assert isinstance(dataset[key].DIMENSION_LABELS, np.ndarray) + dataset[key].attrs["DIMENSION_LABELS"] = ( + dataset[key].attrs["DIMENSION_LABELS"].astype(str) + ) + + return dataset + + +@pytest.mark.parametrize("shape", [(10, 1), (10, 2)]) +def test_numpy_array_to_pandas_dataframe(shape): + """ + Test converting from a numpy.array to a pandas.Dataframe, and ensure that + the colname argument works. + """ + array: np.ndarray = np.ones(shape=shape) + dataframe = array_to_dataframe(array=array) + + assert isinstance(dataframe, pd.DataFrame) + assert len(dataframe.columns) == shape[1] + assert dataframe.columns.to_list() == [str(i) for i in range(shape[1])] + + +@pytest.mark.parametrize("shape", [(10, 1), (10, 2)]) +def test_dask_array_to_dask_dataframe(shape): + """ + Test converting from a dask.array to a dask.dataframe, and ensure that the + startcol argument works. + """ + array: dask.array.core.Array = dask.array.ones(shape=shape, name="varname") + dataframe = array_to_dataframe(array=array, startcol=1) + + assert isinstance(dataframe, dask.dataframe.core.DataFrame) + assert len(dataframe.columns) == shape[1] + assert dataframe.columns.to_list() == [f"varname_{i+1}" for i in range(shape[1])] + + +def test_xarray_dataset_to_parquet_table(dataset): + """ + Test converting from an xarray Dataset to a parquet table, specifying a + list of variables to store and setting 'snappy' compression. + """ + with tempfile.TemporaryDirectory() as tmpdir: + parquetpath: str = os.path.join(tmpdir, "temp.parquet") + ndarray_to_parquet( + ndarray=dataset, + parquetpath=parquetpath, + variables=["longitude", "latitude", "h_corr", "h_corr_sigma"], + compression="snappy", + ) + + df: dask.dataframe.core.DataFrame = dask.dataframe.read_parquet( + path=parquetpath + ) + assert len(df) == 1404 + assert list(df.columns) == [ + "longitude", + "latitude", + "h_corr_1", + "h_corr_2", + "h_corr_sigma_1", + "h_corr_sigma_2", + ] + assert all(np.issubdtype(dtype, np.float64) for dtype in df.dtypes) + + +def test_zarr_array_to_parquet_table(dataset): + """ + Test converting from a zarr array to a parquet table, specifying a list of + variables to store and setting 'snappy' compression. + """ + with tempfile.TemporaryDirectory() as tmpdir: + zarrstore: str = os.path.join(tmpdir, "temp.zarr") + dataset.to_zarr(store=zarrstore, consolidated=True) + zarrarray: zarr.hierarchy.Group = zarr.open_consolidated(store=zarrstore) + + parquetpath: str = os.path.join(tmpdir, "temp.parquet") + ndarray_to_parquet( + ndarray=zarrarray, + parquetpath=parquetpath, + variables=["longitude", "latitude", "h_corr", "delta_time"], + compression="snappy", + ) + + df: dask.dataframe.core.DataFrame = dask.dataframe.read_parquet( + path=parquetpath + ) + assert len(df) == 1404 + assert list(df.columns) == [ + "longitude", + "latitude", + "h_corr_1", + "h_corr_2", + "delta_time_1", + "delta_time_2", + ] + assert all(np.issubdtype(dtype, np.float64) for dtype in df.dtypes)