Skip to content

Commit

Permalink
✨ Ndarray to Parquet convenience function
Browse files Browse the repository at this point in the history
Building on top of eb61ff6, but for n-dimensional arrays, and writing the dataframe to Parquet too! This function might be a little too convenient (read: contains hardcoding), but it smooths out some of the rough edges in terms of PyData file format interoperability. Should contribute this somewhere upstream when I get the time.
  • Loading branch information
weiji14 committed Aug 20, 2020
1 parent 14b020c commit 2d1c8e7
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 41 deletions.
2 changes: 1 addition & 1 deletion atlxi_dhdt.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@
" ds_subset: xr.Dataset = region.subset(data=ds_dhdt)\n",
" # Add a UTC_time column to the dataframe\n",
" ds_subset[\"utc_time\"] = deepicedrain.deltatime_to_utctime(\n",
" dataarray=ds_subset.delta_ds_subsettime\n",
" dataarray=ds_subset.delta_time\n",
" )\n",
" # Convert xarray.Dataset to pandas.DataFrame for easier analysis\n",
" df_many: pd.DataFrame = ds_subset.to_dataframe().dropna()\n",
Expand Down
2 changes: 1 addition & 1 deletion atlxi_dhdt.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@
ds_subset: xr.Dataset = region.subset(data=ds_dhdt)
# Add a UTC_time column to the dataframe
ds_subset["utc_time"] = deepicedrain.deltatime_to_utctime(
dataarray=ds_subset.delta_ds_subsettime
dataarray=ds_subset.delta_time
)
# Convert xarray.Dataset to pandas.DataFrame for easier analysis
df_many: pd.DataFrame = ds_subset.to_dataframe().dropna()
Expand Down
2 changes: 1 addition & 1 deletion deepicedrain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import deepicedrain
import intake
from deepicedrain.deltamath import calculate_delta, nan_linregress, nanptp
from deepicedrain.extraload import array_to_dataframe
from deepicedrain.extraload import array_to_dataframe, ndarray_to_parquet
from deepicedrain.spatiotemporal import (
Region,
deltatime_to_utctime,
Expand Down
78 changes: 77 additions & 1 deletion deepicedrain/extraload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
"""
import pandas as pd

import dask
import zarr

def array_to_dataframe(array, colname: str = None, startcol: int = 0):

def array_to_dataframe(
array: dask.array.core.Array, colname: str = None, startcol: int = 0
):
"""
Converts a 1D or 2D data array into a tidy dataframe structure.
An array of shape (m, n) will turn into a table with m rows and n columns.
Expand Down Expand Up @@ -41,3 +46,74 @@ def array_to_dataframe(array, colname: str = None, startcol: int = 0):
dataframe: pd.DataFrame = pd.DataFrame.from_records(data=array, columns=columns)

return dataframe


def ndarray_to_parquet(
ndarray,
parquetpath: str,
variables: list = None,
dropnacols: list = None,
engine: str = "pyarrow",
**kwargs,
) -> pd.DataFrame:
"""
Converts an n-dimensional xarray Dataset or Zarr Array into an Apache
Parquet columnar file via an intermediate Dask/Pandas DataFrame format.
This is a convenience function that wraps around array_to_dataframe,
intended to make converting n number of arrays easier.
Parameters
----------
ndarray : xarray.Dataset or zarr.hierarchy.Group
An n-dimensional array in xarray containing several coordinate and data
variables, or a Zarr array containing several variables.
parquetpath : str
Filepath to where the resulting parquet file will be stored.
variables : list
Name(s) of the variables/columns that will be stored to the parquet
file. If not provided, all the variables in the zarr group will be
stored.
dropnacols : list
Drop rows containing NaN values in these fields before saving to the
Parquet file.
engine : str
Parquet library to use. Choose from 'auto', 'fastparquet', 'pyarrow'.
Default is "pyarrow".
**kwargs : dict
Extra options to be passed on to pandas.DataFrame.to_parquet.
Returns
-------
point_labels : cudf.Series
A column of labels that indicates which polygon the points fall into.
"""
if variables is None:
try:
variables = [varname for varname, _ in ndarray.arrays()]
except AttributeError:
variables = [c for c in ndarray.coords] + [d for d in ndarray.data_vars]

if isinstance(ndarray, zarr.hierarchy.Group):
array_func = lambda varname: dask.array.from_zarr(ndarray[varname])
else:
array_func = lambda varname: ndarray[varname].data

dataframes: list = [
array_to_dataframe(array=array_func(varname), colname=varname, startcol=1)
for varname in variables
]
dataframe: dask.dataframe.core.DataFrame = dask.dataframe.concat(
dfs=dataframes, axis="columns"
)
if dropnacols:
dataframe = dataframe.dropna(subset=dropnacols)

# Convert to pandas DataFrame first before saving to a single binary
# parquet file, rather than going directly from a Dask DataFrame to a
# series of parquet files in a parquet folder. This ensures that cudf can
# read it later, see https://github.com/rapidsai/cudf/issues/1688
df: pd.DataFrame = dataframe.compute()
df.to_parquet(path=parquetpath, engine=engine, **kwargs)

return df
37 changes: 0 additions & 37 deletions deepicedrain/tests/test_array_to_dataframe.py

This file was deleted.

121 changes: 121 additions & 0 deletions deepicedrain/tests/test_ndarray_to_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Tests various conversions from n-dimensional arrays to columnar dataframe table
structures.
"""
import os
import tempfile

import numpy as np
import pandas as pd
import pytest
import xarray as xr

import dask
import zarr
from deepicedrain import array_to_dataframe, catalog, ndarray_to_parquet


@pytest.fixture(scope="module", name="dataset")
def fixture_dataset():
"""
Load the sample ICESat-2 ATL11 data into an xarray, and clean it up to
allow saving to other formats like Zarr
"""
dataset: xr.Dataset = catalog.test_data.atl11_test_case.to_dask()
for key, variable in dataset.variables.items():
assert isinstance(dataset[key].DIMENSION_LABELS, np.ndarray)
dataset[key].attrs["DIMENSION_LABELS"] = (
dataset[key].attrs["DIMENSION_LABELS"].astype(str)
)

return dataset


@pytest.mark.parametrize("shape", [(10, 1), (10, 2)])
def test_numpy_array_to_pandas_dataframe(shape):
"""
Test converting from a numpy.array to a pandas.Dataframe, and ensure that
the colname argument works.
"""
array: np.ndarray = np.ones(shape=shape)
dataframe = array_to_dataframe(array=array)

assert isinstance(dataframe, pd.DataFrame)
assert len(dataframe.columns) == shape[1]
assert dataframe.columns.to_list() == [str(i) for i in range(shape[1])]


@pytest.mark.parametrize("shape", [(10, 1), (10, 2)])
def test_dask_array_to_dask_dataframe(shape):
"""
Test converting from a dask.array to a dask.dataframe, and ensure that the
startcol argument works.
"""
array: dask.array.core.Array = dask.array.ones(shape=shape, name="varname")
dataframe = array_to_dataframe(array=array, startcol=1)

assert isinstance(dataframe, dask.dataframe.core.DataFrame)
assert len(dataframe.columns) == shape[1]
assert dataframe.columns.to_list() == [f"varname_{i+1}" for i in range(shape[1])]


def test_xarray_dataset_to_parquet_table(dataset):
"""
Test converting from an xarray Dataset to a parquet table, specifying a
list of variables to store and setting 'snappy' compression.
"""
with tempfile.TemporaryDirectory() as tmpdir:
parquetpath: str = os.path.join(tmpdir, "temp.parquet")
ndarray_to_parquet(
ndarray=dataset,
parquetpath=parquetpath,
variables=["longitude", "latitude", "h_corr", "h_corr_sigma"],
compression="snappy",
)

df: dask.dataframe.core.DataFrame = dask.dataframe.read_parquet(
path=parquetpath
)
assert len(df) == 1404
assert list(df.columns) == [
"longitude",
"latitude",
"h_corr_1",
"h_corr_2",
"h_corr_sigma_1",
"h_corr_sigma_2",
]
assert all(np.issubdtype(dtype, np.float64) for dtype in df.dtypes)


def test_zarr_array_to_parquet_table(dataset):
"""
Test converting from a zarr array to a parquet table, specifying a list of
variables to store and setting 'snappy' compression.
"""
with tempfile.TemporaryDirectory() as tmpdir:
zarrstore: str = os.path.join(tmpdir, "temp.zarr")
dataset.to_zarr(store=zarrstore, consolidated=True)
zarrarray: zarr.hierarchy.Group = zarr.open_consolidated(store=zarrstore)

parquetpath: str = os.path.join(tmpdir, "temp.parquet")
ndarray_to_parquet(
ndarray=zarrarray,
parquetpath=parquetpath,
variables=["longitude", "latitude", "h_corr", "delta_time"],
compression="snappy",
)

df: dask.dataframe.core.DataFrame = dask.dataframe.read_parquet(
path=parquetpath
)
assert len(df) == 1404
assert list(df.columns) == [
"longitude",
"latitude",
"h_corr_1",
"h_corr_2",
"delta_time_1",
"delta_time_2",
]
assert all(np.issubdtype(dtype, np.float64) for dtype in df.dtypes)

0 comments on commit 2d1c8e7

Please sign in to comment.