Skip to content

Commit

Permalink
ENH: add to/from_parquet with pyarrow & fastparquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jreback committed Jul 27, 2017
1 parent 465c59f commit decffcf
Show file tree
Hide file tree
Showing 18 changed files with 627 additions and 5 deletions.
1 change: 1 addition & 0 deletions ci/install_travis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ fi
echo
echo "[removing installed pandas]"
conda remove pandas -y --force
pip uninstall -y pandas

if [ "$BUILD_TEST" ]; then

Expand Down
2 changes: 1 addition & 1 deletion ci/requirements-2.7.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ source activate pandas

echo "install 27"

conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1
conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1 fastparquet
4 changes: 2 additions & 2 deletions ci/requirements-3.5.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ source activate pandas

echo "install 35"

conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1

# pip install python-dateutil to get latest
conda remove -n pandas python-dateutil --force
pip install python-dateutil

conda install -n pandas -c conda-forge feather-format pyarrow=0.4.1
2 changes: 1 addition & 1 deletion ci/requirements-3.5_OSX.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ source activate pandas

echo "install 35_OSX"

conda install -n pandas -c conda-forge feather-format==0.3.1
conda install -n pandas -c conda-forge feather-format==0.3.1 fastparquet
1 change: 1 addition & 0 deletions ci/requirements-3.6.pip
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
brotlipy
2 changes: 2 additions & 0 deletions ci/requirements-3.6.run
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ sqlalchemy
pymysql
feather-format
pyarrow
python-snappy
fastparquet
# psycopg2 (not avail on defaults ATM)
beautifulsoup4
s3fs
Expand Down
2 changes: 2 additions & 0 deletions ci/requirements-3.6_WIN.run
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ numexpr
pytables
matplotlib
blosc
fastparquet
pyarrow
1 change: 1 addition & 0 deletions doc/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ Optional Dependencies
* `xarray <http://xarray.pydata.org>`__: pandas like handling for > 2 dims, needed for converting Panels to xarray objects. Version 0.7.0 or higher is recommended.
* `PyTables <http://www.pytables.org>`__: necessary for HDF5-based storage. Version 3.0.0 or higher required, Version 3.2.1 or higher highly recommended.
* `Feather Format <https://github.com/wesm/feather>`__: necessary for feather-based storage, version 0.3.1 or higher.
* ``Apache Parquet Format``, either `pyarrow <http://arrow.apache.org/docs/python/>`__ (>= 0.4.1) or `fastparquet <https://fastparquet.readthedocs.io/en/latest/necessary>`__ (>= 0.0.6) for parquet-based storage. The `snappy <https://pypi.python.org/pypi/python-snappy>`__ and `brotli <https://pypi.python.org/pypi/brotlipy>`__ are available for compression support.
* `SQLAlchemy <http://www.sqlalchemy.org>`__: for SQL database support. Version 0.8.1 or higher recommended. Besides SQLAlchemy, you also need a database specific driver. You can find an overview of supported drivers for each SQL dialect in the `SQLAlchemy docs <http://docs.sqlalchemy.org/en/latest/dialects/index.html>`__. Some common drivers are:

* `psycopg2 <http://initd.org/psycopg/>`__: for PostgreSQL
Expand Down
71 changes: 71 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object. The corresponding ``writer`` functions are object methods that are acces
binary;`MS Excel <https://en.wikipedia.org/wiki/Microsoft_Excel>`__;:ref:`read_excel<io.excel_reader>`;:ref:`to_excel<io.excel_writer>`
binary;`HDF5 Format <https://support.hdfgroup.org/HDF5/whatishdf5.html>`__;:ref:`read_hdf<io.hdf5>`;:ref:`to_hdf<io.hdf5>`
binary;`Feather Format <https://github.com/wesm/feather>`__;:ref:`read_feather<io.feather>`;:ref:`to_feather<io.feather>`
binary;`Parquet Format <https://parquet.apache.org/>`__;:ref:`read_parquet<io.parquet>`;:ref:`to_parquet<io.parquet>`
binary;`Msgpack <http://msgpack.org/index.html>`__;:ref:`read_msgpack<io.msgpack>`;:ref:`to_msgpack<io.msgpack>`
binary;`Stata <https://en.wikipedia.org/wiki/Stata>`__;:ref:`read_stata<io.stata_reader>`;:ref:`to_stata<io.stata_writer>`
binary;`SAS <https://en.wikipedia.org/wiki/SAS_(software)>`__;:ref:`read_sas<io.sas_reader>`;
Expand Down Expand Up @@ -4550,6 +4551,76 @@ Read from a feather file.
import os
os.remove('example.feather')
.. _io.parquet:

Parquet
-------

.. versionadded:: 0.21.0

Parquet provides a sharded binary columnar serialization for data frames. It is designed to make reading and writing data
frames efficient, and to make sharing data across data analysis languages easy. Parquet can use a
variety of compression techniques to shrink the file size as much as possible while still maintaining good read performance.

Parquet is designed to faithfully serialize and de-serialize DataFrames, supporting all of the pandas
dtypes, including extension dtypes such as categorical and datetime with tz.

Several caveats.

- The format will NOT write an ``Index``, or ``MultiIndex`` for the ``DataFrame`` and will raise an
error if a non-default one is provided. You can simply ``.reset_index()`` in order to store the index.
- Duplicate column names and non-string columns names are not supported
- Non supported types include ``Period`` and actual python object types. These will raise a helpful error message
on an attempt at serialization.

See the documentation for `pyarrow <http://arrow.apache.org/docs/python/`__ and `fastparquet <https://fastparquet.readthedocs.io/en/latest/>`__

.. note::

These engines are very similar and should read/write nearly identical parquet format files.
These libraries differ by having different underlying dependencies (``fastparquet`` by using ``numba``, while ``pyarrow`` uses a c-library).
TODO: differing options to write non-standard columns & null treatment

.. ipython:: python
df = pd.DataFrame({'a': list('abc'),
'b': list(range(1, 4)),
'c': np.arange(3, 6).astype('u1'),
'd': np.arange(4.0, 7.0, dtype='float64'),
'e': [True, False, True],
'f': pd.Categorical(list('abc')),
'g': pd.date_range('20130101', periods=3),
'h': pd.date_range('20130101', periods=3, tz='US/Eastern'),
'i': pd.date_range('20130101', periods=3, freq='ns')})
df
df.dtypes
Write to a parquet file.

.. ipython:: python
df.to_parquet('example_pa.parquet', engine='pyarrow')
df.to_parquet('example_fp.parquet', engine='fastparquet')
Read from a parquet file.

.. ipython:: python
result = pd.read_parquet('example_pa.parquet')
result = pd.read_parquet('example_fp.parquet')
# we preserve dtypes
result.dtypes
.. ipython:: python
:suppress:
import os
os.remove('example_pa.parquet')
os.remove('example_fp.parquet')
.. _io.sql:

SQL Queries
Expand Down
2 changes: 2 additions & 0 deletions doc/source/options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ io.hdf.default_format None default format writing format,
'table'
io.hdf.dropna_table True drop ALL nan rows when appending
to a table
io.parquet.engine pyarrow The engine to use as a default for
parquet reading and writing.
mode.chained_assignment warn Raise an exception, warn, or no
action if trying to use chained
assignment, The default is warn
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.21.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Other Enhancements
- :func:`DataFrame.select_dtypes` now accepts scalar values for include/exclude as well as list-like. (:issue:`16855`)
- :func:`date_range` now accepts 'YS' in addition to 'AS' as an alias for start of year (:issue:`9313`)
- :func:`date_range` now accepts 'Y' in addition to 'A' as an alias for end of year (:issue:`9313`)
- Integration with Apache Parquet, including a new top-level ``pd.read_parquet()`` and ``DataFrame.to_parquet()`` method, see :ref:`here <io.parquet>`.

.. _whatsnew_0210.api_breaking:

Expand Down
11 changes: 11 additions & 0 deletions pandas/core/config_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,14 @@ def _register_xlsx(engine, other):
except ImportError:
# fallback
_register_xlsx('openpyxl', 'xlsxwriter')

# Set up the io.parquet specific configuration.
parquet_engine_doc = """
: string
The default parquet reader/writer engine. Available options:
'pyarrow', 'fastparquet', the default is 'pyarrow'
"""

with cf.config_prefix('io.parquet'):
cf.register_option('engine', 'pyarrow', parquet_engine_doc,
validator=is_one_of_factory(['pyarrow', 'fastparquet']))
23 changes: 23 additions & 0 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,29 @@ def to_feather(self, fname):
from pandas.io.feather_format import to_feather
to_feather(self, fname)

def to_parquet(self, fname, engine=None, compression='snappy',
**kwargs):
"""
write out the binary parquet for DataFrames
.. versionadded:: 0.21.0
Parameters
----------
fname : str
string file path
engine : str, optional
The parquet engine, one of {'pyarrow', 'fastparquet'}
if None, will use the option: `io.parquet.engine`
compression : str, optional, default 'snappy'
compression method, includes {'gzip', 'snappy', 'brotli'}
kwargs passed to the engine
"""
from pandas.io.parquet import to_parquet
to_parquet(self, fname, engine,
compression=compression, **kwargs)

@Substitution(header='Write out column names. If a list of string is given, \
it is assumed to be aliases for the column names')
@Appender(fmt.docstring_to_string, indents=1)
Expand Down
1 change: 1 addition & 0 deletions pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pandas.io.sql import read_sql, read_sql_table, read_sql_query
from pandas.io.sas import read_sas
from pandas.io.feather_format import read_feather
from pandas.io.parquet import read_parquet
from pandas.io.stata import read_stata
from pandas.io.pickle import read_pickle, to_pickle
from pandas.io.packers import read_msgpack, to_msgpack
Expand Down
179 changes: 179 additions & 0 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
""" parquet compat """

from warnings import catch_warnings
from distutils.version import LooseVersion
from pandas import DataFrame, RangeIndex, Int64Index, get_option
from pandas.compat import range
from pandas.io.common import get_filepath_or_buffer


def get_engine(engine):
""" return our implementation """

if engine is None:
engine = get_option('io.parquet.engine')

if engine not in ['pyarrow', 'fastparquet']:
raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")

if engine == 'pyarrow':
return PyArrowImpl()
elif engine == 'fastparquet':
return FastParquetImpl()


class PyArrowImpl(object):

def __init__(self):
# since pandas is a dependency of pyarrow
# we need to import on first use

try:
import pyarrow
import pyarrow.parquet
except ImportError:
raise ImportError("pyarrow is required for parquet support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install pyarrow\n")

if LooseVersion(pyarrow.__version__) < '0.4.1':
raise ImportError("pyarrow >= 0.4.1 is required for parquet"
"support\n\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge\n"
"\nor via pip\n"
"pip install pyarrow\n")

self.api = pyarrow

def write(self, df, path, compression='snappy', **kwargs):
path, _, _ = get_filepath_or_buffer(path)
table = self.api.Table.from_pandas(df, timestamps_to_ms=True)
self.api.parquet.write_table(
table, path, compression=compression, **kwargs)

def read(self, path):
path, _, _ = get_filepath_or_buffer(path)
return self.api.parquet.read_table(path).to_pandas()


class FastParquetImpl(object):

def __init__(self):
# since pandas is a dependency of fastparquet
# we need to import on first use

try:
import fastparquet
except ImportError:
raise ImportError("fastparquet is required for parquet support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install fastparquet")

if LooseVersion(fastparquet.__version__) < '0.1.0':
raise ImportError("fastparquet >= 0.1.0 is required for parquet "
"support\n\n"
"you can install via conda\n"
"conda install fastparquet -c conda-forge\n"
"\nor via pip\n"
"pip install fastparquet")

self.api = fastparquet

def write(self, df, path, compression='snappy', **kwargs):
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.
path, _, _ = get_filepath_or_buffer(path)
with catch_warnings(record=True):
self.api.write(path, df,
compression=compression, **kwargs)

def read(self, path):
path, _, _ = get_filepath_or_buffer(path)
return self.api.ParquetFile(path).to_pandas()


def to_parquet(df, path, engine=None, compression='snappy', **kwargs):
"""
Write a DataFrame to the parquet format.
Parameters
----------
df : DataFrame
path : string
File path
engine : str, optional
The parquet engine, one of {'pyarrow', 'fastparquet'}
if None, will use the option: `io.parquet.engine`
compression : str, optional, default 'snappy'
compression method, includes {'gzip', 'snappy', 'brotli'}
kwargs are passed to the engine
"""

impl = get_engine(engine)

if not isinstance(df, DataFrame):
raise ValueError("to_parquet only support IO with DataFrames")

valid_types = {'string', 'unicode'}

# validate index
# --------------

# validate that we have only a default index
# raise on anything else as we don't serialize the index

if not isinstance(df.index, Int64Index):
raise ValueError("parquet does not support serializing {} "
"for the index; you can .reset_index()"
"to make the index into column(s)".format(
type(df.index)))

if not df.index.equals(RangeIndex.from_range(range(len(df)))):
raise ValueError("parquet does not support serializing a "
"non-default index for the index; you "
"can .reset_index() to make the index "
"into column(s)")

if df.index.name is not None:
raise ValueError("parquet does not serialize index meta-data on a "
"default index")

# validate columns
# ----------------

# must have value column names (strings only)
if df.columns.inferred_type not in valid_types:
raise ValueError("parquet must have string column names")

return impl.write(df, path, compression=compression)


def read_parquet(path, engine=None, **kwargs):
"""
Load a parquet object from the file path, returning a DataFrame.
.. versionadded 0.21.0
Parameters
----------
path : string
File path
engine : str, optional
The parquet engine, one of {'pyarrow', 'fastparquet'}
if None, will use the option: `io.parquet.engine`
kwargs are passed to the engine
Returns
-------
DataFrame
"""

impl = get_engine(engine)
return impl.read(path)
2 changes: 1 addition & 1 deletion pandas/tests/api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TestPDApi(Base):
'read_gbq', 'read_hdf', 'read_html', 'read_json',
'read_msgpack', 'read_pickle', 'read_sas', 'read_sql',
'read_sql_query', 'read_sql_table', 'read_stata',
'read_table', 'read_feather']
'read_table', 'read_feather', 'read_parquet']

# top-level to_* funcs
funcs_to = ['to_datetime', 'to_msgpack',
Expand Down
Loading

0 comments on commit decffcf

Please sign in to comment.