Skip to content

Commit

Permalink
BigQuery: Allow choice of compression when loading from dataframe (#8938
Browse files Browse the repository at this point in the history
)

* Allow choice of compression when loading from DF

* Mark parquet_compression parameter as [Beta]

* Support compression arg in dataframe_to_parquet()
  • Loading branch information
plamut authored Aug 6, 2019
1 parent 8804020 commit a015978
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 4 deletions.
9 changes: 7 additions & 2 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)


def dataframe_to_parquet(dataframe, bq_schema, filepath):
def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SNAPPY"):
"""Write dataframe as a Parquet file, according to the desired BQ schema.
This function requires the :mod:`pyarrow` package. Arrow is used as an
Expand All @@ -222,12 +222,17 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath):
columns in the DataFrame.
filepath (str):
Path to write Parquet file to.
parquet_compression (str):
(optional) The compression codec to use by the the
``pyarrow.parquet.write_table`` serializing method. Defaults to
"SNAPPY".
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
"""
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)


def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
Expand Down
25 changes: 23 additions & 2 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ def load_table_from_dataframe(
location=None,
project=None,
job_config=None,
parquet_compression="snappy",
):
"""Upload the contents of a table from a pandas DataFrame.
Expand Down Expand Up @@ -1491,6 +1492,20 @@ def load_table_from_dataframe(
column names matching those of the dataframe. The BigQuery
schema is used to determine the correct data type conversion.
Indexes are not loaded. Requires the :mod:`pyarrow` library.
parquet_compression (str):
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
If ``pyarrow`` and job config schema are used, the argument
is directly passed as the ``compression`` argument to the
underlying ``pyarrow.parquet.write_table()`` method (the
default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
If either ``pyarrow`` or job config schema are missing, the
argument is directly passed as the ``compression`` argument
to the underlying ``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand All @@ -1515,8 +1530,14 @@ def load_table_from_dataframe(

try:
if pyarrow and job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

_pandas_helpers.dataframe_to_parquet(
dataframe, job_config.schema, tmppath
dataframe,
job_config.schema,
tmppath,
parquet_compression=parquet_compression,
)
else:
if job_config.schema:
Expand All @@ -1527,7 +1548,7 @@ def load_table_from_dataframe(
PendingDeprecationWarning,
stacklevel=2,
)
dataframe.to_parquet(tmppath)
dataframe.to_parquet(tmppath, compression=parquet_compression)

with open(tmppath, "rb") as parquet_file:
return self.load_table_from_file(
Expand Down
22 changes: 22 additions & 0 deletions bigquery/tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import functools
import warnings

import mock

try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -613,3 +615,23 @@ def test_dataframe_to_parquet_w_missing_columns(module_under_test, monkeypatch):
pandas.DataFrame(), (schema.SchemaField("not_found", "STRING"),), None
)
assert "columns in schema must match" in str(exc_context.value)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`")
def test_dataframe_to_parquet_compression_method(module_under_test):
bq_schema = (schema.SchemaField("field00", "STRING"),)
dataframe = pandas.DataFrame({"field00": ["foo", "bar"]})

write_table_patch = mock.patch.object(
module_under_test.pyarrow.parquet, "write_table", autospec=True
)

with write_table_patch as fake_write_table:
module_under_test.dataframe_to_parquet(
dataframe, bq_schema, None, parquet_compression="ZSTD"
)

call_args = fake_write_table.call_args
assert call_args is not None
assert call_args.kwargs.get("compression") == "ZSTD"
60 changes: 60 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5375,6 +5375,66 @@ def test_load_table_from_dataframe_w_schema_wo_pyarrow(self):
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == schema

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self):
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}]
dataframe = pandas.DataFrame(records)
schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER"))
job_config = job.LoadJobConfig(schema=schema)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
to_parquet_patch = mock.patch(
"google.cloud.bigquery.client._pandas_helpers.dataframe_to_parquet",
autospec=True,
)

with load_patch, to_parquet_patch as fake_to_parquet:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
job_config=job_config,
location=self.LOCATION,
parquet_compression="LZ4",
)

call_args = fake_to_parquet.call_args
assert call_args is not None
assert call_args.kwargs.get("parquet_compression") == "LZ4"

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self):
client = self._make_client()
records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}]
dataframe = pandas.DataFrame(records)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
pyarrow_patch = mock.patch("google.cloud.bigquery.client.pyarrow", None)
to_parquet_patch = mock.patch.object(
dataframe, "to_parquet", wraps=dataframe.to_parquet
)

with load_patch, pyarrow_patch, to_parquet_patch as to_parquet_spy:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
location=self.LOCATION,
parquet_compression="gzip",
)

call_args = to_parquet_spy.call_args
assert call_args is not None
assert call_args.kwargs.get("compression") == "gzip"

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nulls(self):
Expand Down

0 comments on commit a015978

Please sign in to comment.