Skip to content

Commit

Permalink
Allow non-default indexes in to_parquet.
Browse files Browse the repository at this point in the history
...when supported by the underlying engine.
Fixes pandas-dev#18581
  • Loading branch information
dhirschfeld committed Dec 9, 2017
2 parents 27a64b2 + 4b2072c commit 5afb7e8
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 153 deletions.
2 changes: 1 addition & 1 deletion ci/requirements-2.7.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ source activate pandas

echo "install 27"

conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1 fastparquet
conda install -n pandas -c conda-forge feather-format pyarrow=0.7.0 fastparquet
2 changes: 1 addition & 1 deletion ci/requirements-3.5.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ echo "install 35"
conda remove -n pandas python-dateutil --force
pip install python-dateutil

conda install -n pandas -c conda-forge feather-format pyarrow=0.5.0
conda install -n pandas -c conda-forge feather-format pyarrow=0.7.0
2 changes: 1 addition & 1 deletion doc/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ Optional Dependencies
* `xarray <http://xarray.pydata.org>`__: pandas like handling for > 2 dims, needed for converting Panels to xarray objects. Version 0.7.0 or higher is recommended.
* `PyTables <http://www.pytables.org>`__: necessary for HDF5-based storage. Version 3.0.0 or higher required, Version 3.2.1 or higher highly recommended.
* `Feather Format <https://github.com/wesm/feather>`__: necessary for feather-based storage, version 0.3.1 or higher.
* `Apache Parquet <https://parquet.apache.org/>`__, either `pyarrow <http://arrow.apache.org/docs/python/>`__ (>= 0.4.1) or `fastparquet <https://fastparquet.readthedocs.io/en/latest/necessary>`__ (>= 0.0.6) for parquet-based storage. The `snappy <https://pypi.python.org/pypi/python-snappy>`__ and `brotli <https://pypi.python.org/pypi/brotlipy>`__ are available for compression support.
* `Apache Parquet <https://parquet.apache.org/>`__, either `pyarrow <http://arrow.apache.org/docs/python/>`__ (>= 0.7.0) or `fastparquet <https://fastparquet.readthedocs.io/en/latest/necessary>`__ (>= 0.1.0) for parquet-based storage. The `snappy <https://pypi.python.org/pypi/python-snappy>`__ and `brotli <https://pypi.python.org/pypi/brotlipy>`__ are available for compression support.
* `SQLAlchemy <http://www.sqlalchemy.org>`__: for SQL database support. Version 0.8.1 or higher recommended. Besides SQLAlchemy, you also need a database specific driver. You can find an overview of supported drivers for each SQL dialect in the `SQLAlchemy docs <http://docs.sqlalchemy.org/en/latest/dialects/index.html>`__. Some common drivers are:

* `psycopg2 <http://initd.org/psycopg/>`__: for PostgreSQL
Expand Down
5 changes: 1 addition & 4 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4504,11 +4504,8 @@ dtypes, including extension dtypes such as datetime with tz.

Several caveats.

- The format will NOT write an ``Index``, or ``MultiIndex`` for the
``DataFrame`` and will raise an error if a non-default one is provided. You
can ``.reset_index()`` to store the index or ``.reset_index(drop=True)`` to
ignore it.
- Duplicate column names and non-string columns names are not supported
- Index level names, if specified, must be strings
- Categorical dtypes can be serialized to parquet, but will de-serialize as ``object`` dtype.
- Non supported types include ``Period`` and actual python object types. These will raise a helpful error message
on an attempt at serialization.
Expand Down
4 changes: 4 additions & 0 deletions doc/source/whatsnew/v0.22.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,8 @@ Other
^^^^^

- Improved error message when attempting to use a Python keyword as an identifier in a ``numexpr`` backed query (:issue:`18221`)
<<<<<<< HEAD
-
=======
-
>>>>>>> parquet-non-default-indexes
212 changes: 124 additions & 88 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from warnings import catch_warnings
from distutils.version import LooseVersion
from pandas import DataFrame, RangeIndex, Int64Index, get_option
from pandas.compat import range
from pandas.compat import range, string_types
from pandas.core.common import AbstractMethodError
from pandas.io.common import get_filepath_or_buffer


Expand Down Expand Up @@ -34,82 +35,152 @@ def get_engine(engine):
return FastParquetImpl()


class PyArrowImpl(object):
class BaseImpl(object):

api = None # module

@staticmethod
def validate_dataframe(df):
if not isinstance(df, DataFrame):
raise ValueError("to_parquet only supports IO with DataFrames")
# must have value column names (strings only)
if df.columns.inferred_type not in {'string', 'unicode'}:
raise ValueError("parquet must have string column names")
# index level names must be strings
valid_names = all(
isinstance(name, string_types)
for name in df.index.names
if name is not None
)
if not valid_names:
raise ValueError("Index level names must be strings")

def write(self, df, path, compression, **kwargs):
raise AbstractMethodError(self)

def read(self, path, columns=None, **kwargs):
raise AbstractMethodError(self)


class PyArrowImpl(BaseImpl):

def __init__(self):
# since pandas is a dependency of pyarrow
# we need to import on first use

try:
import pyarrow
import pyarrow.parquet
except ImportError:
raise ImportError("pyarrow is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n")

if LooseVersion(pyarrow.__version__) < LooseVersion('0.4.1'):
raise ImportError("pyarrow >= 0.4.1 is required for parquet"
"support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n")

self._pyarrow_lt_050 = (LooseVersion(pyarrow.__version__) <
LooseVersion('0.5.0'))
self._pyarrow_lt_060 = (LooseVersion(pyarrow.__version__) <
LooseVersion('0.6.0'))
raise ImportError(
"pyarrow is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n"
)
if LooseVersion(pyarrow.__version__) < '0.4.1':
raise ImportError(
"pyarrow >= 0.4.1 is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install -U pyarrow\n"
)
self._pyarrow_lt_070 = (
LooseVersion(pyarrow.__version__) < LooseVersion('0.7.0')
)
self.api = pyarrow

def write(self, df, path, compression='snappy',
coerce_timestamps='ms', **kwargs):
self.validate_dataframe(df)
if self._pyarrow_lt_070:
self._validate_write_lt_070(
df, path, compression, coerce_timestamps, **kwargs
)
path, _, _ = get_filepath_or_buffer(path)
if self._pyarrow_lt_060:
table = self.api.Table.from_pandas(df, timestamps_to_ms=True)
self.api.parquet.write_table(
table, path, compression=compression, **kwargs)

else:
table = self.api.Table.from_pandas(df)
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)
table = self.api.Table.from_pandas(df)
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)

def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
return self.api.parquet.read_table(path, columns=columns,
**kwargs).to_pandas()


class FastParquetImpl(object):
parquet_file = self.api.parquet.ParquetFile(path)
if self._pyarrow_lt_070:
parquet_file.path = path
return self._read_lt_070(parquet_file, columns, **kwargs)
kwargs['use_pandas_metadata'] = True
return parquet_file.read(columns=columns, **kwargs).to_pandas()


def _validate_write_lt_070(self, df, path, compression='snappy',
coerce_timestamps='ms', **kwargs):
# Compatibility shim for pyarrow < 0.7.0
# TODO: Remove in pandas 0.22.0
from pandas.core.indexes.multi import MultiIndex
if isinstance(df.index, MultiIndex):
msg = "Mulit-index DataFrames are only supported with pyarrow >= 0.7.0"
raise ValueError(msg)
# Validate index
if not isinstance(df.index, Int64Index):
msg = (
"parquet does not support serializing {} for the index;"
"you can .reset_index() to make the index into column(s)"
)
raise ValueError(msg.format(type(df.index)))
if not df.index.equals(RangeIndex(len(df))):
raise ValueError(
"parquet does not support serializing a non-default index "
"for the index; you can .reset_index() to make the index "
"into column(s)"
)
if df.index.name is not None:
raise ValueError(
"parquet does not serialize index meta-data "
"on a default index"
)

def _read_lt_070(self, parquet_file, columns, **kwargs):
# Compatibility shim for pyarrow < 0.7.0
# TODO: Remove in pandas 0.22.0
from itertools import chain
import json
if columns is not None:
metadata = json.loads(parquet_file.metadata.metadata[b'pandas'])
columns = set(chain(columns, metadata['index_columns']))
kwargs['columns'] = columns
return self.api.parquet.read_table(parquet_file.path, **kwargs).to_pandas()


class FastParquetImpl(BaseImpl):

def __init__(self):
# since pandas is a dependency of fastparquet
# we need to import on first use

try:
import fastparquet
except ImportError:
raise ImportError("fastparquet is required for parquet support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet")

if LooseVersion(fastparquet.__version__) < LooseVersion('0.1.0'):
raise ImportError("fastparquet >= 0.1.0 is required for parquet "
"support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet")

raise ImportError(
"fastparquet is required for parquet support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet"
)
if LooseVersion(fastparquet.__version__) < '0.1.0':
raise ImportError(
"fastparquet >= 0.1.0 is required for parquet "
"support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install -U fastparquet"
)
self.api = fastparquet

def write(self, df, path, compression='snappy', **kwargs):
self.validate_dataframe(df)
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.
Expand All @@ -120,7 +191,8 @@ def write(self, df, path, compression='snappy', **kwargs):

def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
return self.api.ParquetFile(path).to_pandas(columns=columns, **kwargs)
parquet_file = self.api.ParquetFile(path)
return parquet_file.to_pandas(columns=columns, **kwargs)


def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
Expand All @@ -141,43 +213,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
kwargs
Additional keyword arguments passed to the engine
"""

impl = get_engine(engine)

if not isinstance(df, DataFrame):
raise ValueError("to_parquet only support IO with DataFrames")

valid_types = {'string', 'unicode'}

# validate index
# --------------

# validate that we have only a default index
# raise on anything else as we don't serialize the index

if not isinstance(df.index, Int64Index):
raise ValueError("parquet does not support serializing {} "
"for the index; you can .reset_index()"
"to make the index into column(s)".format(
type(df.index)))

if not df.index.equals(RangeIndex.from_range(range(len(df)))):
raise ValueError("parquet does not support serializing a "
"non-default index for the index; you "
"can .reset_index() to make the index "
"into column(s)")

if df.index.name is not None:
raise ValueError("parquet does not serialize index meta-data on a "
"default index")

# validate columns
# ----------------

# must have value column names (strings only)
if df.columns.inferred_type not in valid_types:
raise ValueError("parquet must have string column names")

return impl.write(df, path, compression=compression, **kwargs)


Expand Down
Loading

0 comments on commit 5afb7e8

Please sign in to comment.