Skip to content

Commit

Permalink
feat(bigquery): allow passing schema as a sequence of dicts (#9550)
Browse files Browse the repository at this point in the history
* feat(bigquery): add _to_schema_fields() schema helper

* Allow passing schema as dicts _helpers

* Allow passing schema as dicts in table.py

* Allow passing schema as dicts in job.py

* Import SchemaField directly in several tests

SchemaField should not be imported from bigquery.table, but directly
from where it's defined, so that any changes to the imports in
bigquery.table do not cause unnecessary test failures.

* Allow passing schema as dicts in pandas helpers

* Replace return statement with an else block

* Alter the type spec of values in schema field dict

* Blacken a few files

* Simplify _to_schema_fields() schema helper

* Update docstrings for schema parameter
  • Loading branch information
plamut authored Nov 3, 2019
1 parent 5571911 commit 89eaedb
Show file tree
Hide file tree
Showing 11 changed files with 516 additions and 75 deletions.
27 changes: 25 additions & 2 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,44 @@ def _row_tuple_from_json(row, schema):
Args:
row (Dict): A JSON response row to be converted.
schema (Tuple): A tuple of :class:`~google.cloud.bigquery.schema.SchemaField`.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]): Specification of the field types in ``row``.
Returns:
Tuple: A tuple of data converted to native types.
"""
from google.cloud.bigquery.schema import _to_schema_fields

schema = _to_schema_fields(schema)

row_data = []
for field, cell in zip(schema, row["f"]):
row_data.append(_field_from_json(cell["v"], field))
return tuple(row_data)


def _rows_from_json(values, schema):
"""Convert JSON row data to rows with appropriate types."""
"""Convert JSON row data to rows with appropriate types.
Args:
values (Sequence[Dict]): The list of responses (JSON rows) to convert.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
Returns:
List[:class:`~google.cloud.bigquery.Row`]
"""
from google.cloud.bigquery import Row
from google.cloud.bigquery.schema import _to_schema_fields

schema = _to_schema_fields(schema)
field_to_index = _field_to_index_mapping(schema)
return [Row(_row_tuple_from_json(r, schema), field_to_index) for r in values]

Expand Down
58 changes: 46 additions & 12 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
Args:
dataframe (pandas.DataFrame):
DataFrame for which the client determines the BigQuery schema.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A BigQuery schema. Use this argument to override the autodetected
type for some or all of the DataFrame columns.
Expand All @@ -249,6 +252,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
any column cannot be determined.
"""
if bq_schema:
bq_schema = schema._to_schema_fields(bq_schema)
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
Expand Down Expand Up @@ -297,9 +301,12 @@ def dataframe_to_arrow(dataframe, bq_schema):
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Arrow table.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Desired BigQuery schema. The number of columns must match the
number of columns in the DataFrame.
Returns:
pyarrow.Table:
Expand All @@ -310,6 +317,8 @@ def dataframe_to_arrow(dataframe, bq_schema):
column_and_index_names = set(
name for name, _ in list_columns_and_indexes(dataframe)
)

bq_schema = schema._to_schema_fields(bq_schema)
bq_field_names = set(field.name for field in bq_schema)

extra_fields = bq_field_names - column_and_index_names
Expand Down Expand Up @@ -354,7 +363,10 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
filepath (str):
Expand All @@ -368,6 +380,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)

Expand All @@ -388,20 +401,24 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_tabledata_list(pages, schema):
def download_arrow_tabledata_list(pages, bq_schema):
"""Use tabledata.list to construct an iterable of RecordBatches.
Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
An iterator over the result pages.
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A decription of the fields in result pages.
Yields:
:class:`pyarrow.RecordBatch`
The next page of records as a ``pyarrow`` record batch.
"""
column_names = bq_to_arrow_schema(schema) or [field.name for field in schema]
arrow_types = [bq_to_arrow_data_type(field) for field in schema]
bq_schema = schema._to_schema_fields(bq_schema)
column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
Expand All @@ -422,9 +439,26 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_tabledata_list(pages, schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_names = [field.name for field in schema]
def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame.
Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
An iterator over the result pages.
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
A decription of the fields in result pages.
dtypes(Mapping[str, numpy.dtype]):
The types of columns in result data to hint construction of the
resulting DataFrame. Not all column types have to be specified.
Yields:
:class:`pandas.DataFrame`
The next page of records as a ``pandas.DataFrame`` record batch.
"""
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)

Expand Down
11 changes: 7 additions & 4 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1225,8 +1226,10 @@ def range_partitioning(self, value):

@property
def schema(self):
"""List[google.cloud.bigquery.schema.SchemaField]: Schema of the
destination table.
"""Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]: Schema of the destination table.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.schema
Expand All @@ -1242,8 +1245,8 @@ def schema(self, value):
self._del_sub_prop("schema")
return

if not all(hasattr(field, "to_api_repr") for field in value):
raise ValueError("Schema items must be fields")
value = _to_schema_fields(value)

_helpers._set_sub_prop(
self._properties,
["load", "schema", "fields"],
Expand Down
35 changes: 35 additions & 0 deletions bigquery/google/cloud/bigquery/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Schemas for BigQuery tables / queries."""

import collections

from google.cloud.bigquery_v2 import types


Expand Down Expand Up @@ -256,3 +258,36 @@ def _build_schema_resource(fields):
Sequence[Dict]: Mappings describing the schema of the supplied fields.
"""
return [field.to_api_repr() for field in fields]


def _to_schema_fields(schema):
"""Coerce `schema` to a list of schema field instances.
Args:
schema(Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Table schema to convert. If some items are passed as mappings,
their content must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
Returns:
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`]
Raises:
Exception: If ``schema`` is not a sequence, or if any item in the
sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
for field in schema:
if not isinstance(field, (SchemaField, collections.Mapping)):
raise ValueError(
"Schema items must either be fields or compatible "
"mapping representations."
)

return [
field if isinstance(field, SchemaField) else SchemaField.from_api_repr(field)
for field in schema
]
36 changes: 26 additions & 10 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration

Expand Down Expand Up @@ -305,8 +305,13 @@ class Table(object):
A pointer to a table. If ``table_ref`` is a string, it must
included a project ID, dataset ID, and table ID, each separated
by ``.``.
schema (List[google.cloud.bigquery.schema.SchemaField]):
The table's schema
schema (Optional[Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
"""

_PROPERTY_TO_API_FIELD = {
Expand Down Expand Up @@ -369,13 +374,17 @@ def require_partition_filter(self, value):

@property
def schema(self):
"""List[google.cloud.bigquery.schema.SchemaField]: Table's schema.
"""Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]:
Table's schema.
Raises:
TypeError: If 'value' is not a sequence
ValueError:
If any item in the sequence is not a
:class:`~google.cloud.bigquery.schema.SchemaField`
Exception:
If ``schema`` is not a sequence, or if any item in the sequence
is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
prop = self._properties.get("schema")
if not prop:
Expand All @@ -387,9 +396,8 @@ def schema(self):
def schema(self, value):
if value is None:
self._properties["schema"] = None
elif not all(isinstance(field, SchemaField) for field in value):
raise ValueError("Schema items must be fields")
else:
value = _to_schema_fields(value)
self._properties["schema"] = {"fields": _build_schema_resource(value)}

@property
Expand Down Expand Up @@ -1284,6 +1292,13 @@ class RowIterator(HTTPIterator):
api_request (Callable[google.cloud._http.JSONConnection.api_request]):
The function to use to make API requests.
path (str): The method path to query for the list of items.
schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
The table's schema. If any item is a mapping, its content must be
compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
page_token (str): A token identifying a page in a result set to start
fetching results from.
max_results (int, optional): The maximum number of results to fetch.
Expand Down Expand Up @@ -1328,6 +1343,7 @@ def __init__(
page_start=_rows_page_start,
next_token="pageToken",
)
schema = _to_schema_fields(schema)
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
Expand Down
28 changes: 26 additions & 2 deletions bigquery/tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import decimal
import unittest

import mock


class Test_not_null(unittest.TestCase):
def _call_fut(self, value, field):
Expand Down Expand Up @@ -412,7 +414,8 @@ class Test_row_tuple_from_json(unittest.TestCase):
def _call_fut(self, row, schema):
from google.cloud.bigquery._helpers import _row_tuple_from_json

return _row_tuple_from_json(row, schema)
with _field_isinstance_patcher():
return _row_tuple_from_json(row, schema)

def test_w_single_scalar_column(self):
# SELECT 1 AS col
Expand Down Expand Up @@ -529,7 +532,8 @@ class Test_rows_from_json(unittest.TestCase):
def _call_fut(self, rows, schema):
from google.cloud.bigquery._helpers import _rows_from_json

return _rows_from_json(rows, schema)
with _field_isinstance_patcher():
return _rows_from_json(rows, schema)

def test_w_record_subfield(self):
from google.cloud.bigquery.table import Row
Expand Down Expand Up @@ -1023,3 +1027,23 @@ def __init__(self, mode, name="unknown", field_type="UNKNOWN", fields=()):
self.name = name
self.field_type = field_type
self.fields = fields


def _field_isinstance_patcher():
"""A patcher thank makes _Field instances seem like SchemaField instances.
"""
from google.cloud.bigquery.schema import SchemaField

def fake_isinstance(instance, target_class):
if instance.__class__.__name__ != "_Field":
return isinstance(instance, target_class) # pragma: NO COVER

# pretend that _Field() instances are actually instances of SchemaField
return target_class is SchemaField or (
isinstance(target_class, tuple) and SchemaField in target_class
)

patcher = mock.patch(
"google.cloud.bigquery.schema.isinstance", side_effect=fake_isinstance
)
return patcher
Loading

0 comments on commit 89eaedb

Please sign in to comment.