Skip to content

Commit

Permalink
BigQuery: Add ability to get query results as a Pandas dataframe. (#4354
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alixhami authored and Jon Wayne Parrott committed Dec 4, 2017
1 parent ad1174b commit 3511e87
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 16 deletions.
2 changes: 1 addition & 1 deletion bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def _rows_page_start(iterator, page, response):
total_rows = response.get('totalRows')
if total_rows is not None:
total_rows = int(total_rows)
iterator.total_rows = total_rows
iterator._total_rows = total_rows
# pylint: enable=unused-argument


Expand Down
17 changes: 5 additions & 12 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@

from google.cloud.bigquery._helpers import DEFAULT_RETRY
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW
from google.cloud.bigquery._helpers import _field_to_index_mapping
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start
from google.cloud.bigquery._helpers import _snake_to_camel_case
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -48,6 +45,7 @@
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TableListItem
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA
from google.cloud.bigquery.table import _row_from_mapping

Expand Down Expand Up @@ -1189,7 +1187,7 @@ def list_rows(self, table, selected_fields=None, max_results=None,
:type retry: :class:`google.api_core.retry.Retry`
:param retry: (Optional) How to retry the RPC.
:rtype: :class:`~google.api_core.page_iterator.Iterator`
:rtype: :class:`~google.cloud.bigquery.table.RowIterator`
:returns: Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s. During each
page, the iterator will have the ``total_rows`` attribute
Expand Down Expand Up @@ -1217,20 +1215,15 @@ def list_rows(self, table, selected_fields=None, max_results=None,
if start_index is not None:
params['startIndex'] = start_index

iterator = page_iterator.HTTPIterator(
row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry),
path='%s/data' % (table.path,),
item_to_value=_item_to_row,
items_key='rows',
schema=schema,
page_token=page_token,
next_token='pageToken',
max_results=max_results,
page_start=_rows_page_start,
extra_params=params)
iterator.schema = schema
iterator._field_to_index = _field_to_index_mapping(schema)
return iterator
return row_iterator

def list_partitions(self, table, retry=DEFAULT_RETRY):
"""List the partitions in a table.
Expand Down
15 changes: 14 additions & 1 deletion bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
:type retry: :class:`google.api_core.retry.Retry`
:param retry: (Optional) How to retry the call that retrieves rows.
:rtype: :class:`~google.api_core.page_iterator.Iterator`
:rtype: :class:`~google.cloud.bigquery.table.RowIterator`
:returns:
Iterator of row data :class:`~google.cloud.bigquery.table.Row`-s.
During each page, the iterator will have the ``total_rows``
Expand All @@ -1949,6 +1949,19 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
return self._client.list_rows(dest_table, selected_fields=schema,
retry=retry)

def to_dataframe(self):
"""Return a pandas DataFrame from a QueryJob
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
from the destination table's schema.
Raises:
ValueError: If the `pandas` library cannot be imported.
"""
return self.result().to_dataframe()

def __iter__(self):
return iter(self.result())

Expand Down
78 changes: 78 additions & 0 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@
import operator

import six
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.api_core.page_iterator import HTTPIterator

from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _millis_from_datetime
from google.cloud.bigquery._helpers import _item_to_row
from google.cloud.bigquery._helpers import _rows_page_start
from google.cloud.bigquery._helpers import _snake_to_camel_case
from google.cloud.bigquery._helpers import _field_to_index_mapping
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
Expand Down Expand Up @@ -1023,3 +1032,72 @@ def __repr__(self):
key=operator.itemgetter(1))
f2i = '{' + ', '.join('%r: %d' % item for item in items) + '}'
return 'Row({}, {})'.format(self._xxx_values, f2i)


class RowIterator(HTTPIterator):
"""A class for iterating through HTTP/JSON API row list responses.
Args:
client (google.cloud.bigquery.Client): The API client.
api_request (Callable[google.cloud._http.JSONConnection.api_request]):
The function to use to make API requests.
path (str): The method path to query for the list of items.
page_token (str): A token identifying a page in a result set to start
fetching results from.
max_results (int): The maximum number of results to fetch.
extra_params (dict): Extra query string parameters for the API call.
.. autoattribute:: pages
"""

def __init__(self, client, api_request, path, schema, page_token=None,
max_results=None, extra_params=None):
super(RowIterator, self).__init__(
client, api_request, path, item_to_value=_item_to_row,
items_key='rows', page_token=page_token, max_results=max_results,
extra_params=extra_params, page_start=_rows_page_start,
next_token='pageToken')
self._schema = schema
self._field_to_index = _field_to_index_mapping(schema)
self._total_rows = None

@property
def schema(self):
"""Schema for the table containing the rows
Returns:
list of :class:`~google.cloud.bigquery.schema.SchemaField`:
fields describing the schema
"""
return list(self._schema)

@property
def total_rows(self):
"""The total number of rows in the table.
Returns:
int: the row count.
"""
return self._total_rows

def to_dataframe(self):
"""Create a pandas DataFrame from the query results.
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
from the destination table's schema.
Raises:
ValueError: If the `pandas` library cannot be imported.
"""
if pandas is None:
raise ValueError('The pandas library is not installed, please '
'install pandas to use the to_dataframe() '
'function.')

column_headers = [field.name for field in self.schema]
rows = [row.values() for row in iter(self)]

return pandas.DataFrame(rows, columns=column_headers)
4 changes: 2 additions & 2 deletions bigquery/nox.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def default(session):
"""
# Install all test dependencies, then install this package in-place.
session.install('mock', 'pytest', 'pytest-cov', *LOCAL_DEPS)
session.install('-e', '.')
session.install('-e', '.[pandas]')

# Run py.test against the unit tests.
session.run(
Expand Down Expand Up @@ -89,7 +89,7 @@ def system(session, py):
os.path.join('..', 'storage'),
os.path.join('..', 'test_utils'),
)
session.install('-e', '.')
session.install('-e', '.[pandas]')

# Run py.test against the system tests.
session.run(
Expand Down
5 changes: 5 additions & 0 deletions bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
'requests >= 2.18.0',
]

EXTRAS_REQUIREMENTS = {
'pandas': ['pandas >= 0.17.1'],
}

setup(
name='google-cloud-bigquery',
version='0.28.1.dev1',
Expand All @@ -69,5 +73,6 @@
],
packages=find_packages(exclude=('tests*',)),
install_requires=REQUIREMENTS,
extras_require=EXTRAS_REQUIREMENTS,
**SETUP_BASE
)
76 changes: 76 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import uuid

import six
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.api_core.exceptions import PreconditionFailed
from google.cloud import bigquery
Expand Down Expand Up @@ -1244,6 +1248,28 @@ def test_query_iter(self):
row_tuples = [r.values() for r in query_job]
self.assertEqual(row_tuples, [(1,)])

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_query_results_to_dataframe(self):
QUERY = """
SELECT id, author, time_ts, dead
from `bigquery-public-data.hacker_news.comments`
LIMIT 10
"""

df = Config.CLIENT.query(QUERY).result().to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ['id', 'author', 'time_ts', 'dead']
self.assertEqual(list(df), column_names) # verify the column names
exp_datatypes = {'id': int, 'author': str,
'time_ts': pandas.Timestamp, 'dead': bool}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

def test_query_table_def(self):
gs_url = self._write_csv_to_storage(
'bq_external_test' + unique_resource_id(), 'person_ages.csv',
Expand Down Expand Up @@ -1419,6 +1445,56 @@ def test_create_table_rows_fetch_nested_schema(self):
e_favtime = datetime.datetime(*parts[0:6])
self.assertEqual(found[7], e_favtime)

def _fetch_dataframe(self, query):
return Config.CLIENT.query(query).result().to_dataframe()

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_nested_table_to_dataframe(self):
SF = bigquery.SchemaField
schema = [
SF('string_col', 'STRING', mode='NULLABLE'),
SF('record_col', 'RECORD', mode='NULLABLE', fields=[
SF('nested_string', 'STRING', mode='NULLABLE'),
SF('nested_repeated', 'INTEGER', mode='REPEATED'),
SF('nested_record', 'RECORD', mode='NULLABLE', fields=[
SF('nested_nested_string', 'STRING', mode='NULLABLE'),
]),
]),
]
record = {
'nested_string': 'another string value',
'nested_repeated': [0, 1, 2],
'nested_record': {'nested_nested_string': 'some deep insight'},
}
to_insert = [
('Some value', record)
]
table_id = 'test_table'
dataset = self.temp_dataset(_make_dataset_id('nested_df'))
table_arg = Table(dataset.table(table_id), schema=schema)
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)
Config.CLIENT.create_rows(table, to_insert)
QUERY = 'SELECT * from `{}.{}.{}`'.format(
Config.CLIENT.project, dataset.dataset_id, table_id)

retry = RetryResult(_has_rows, max_tries=8)
df = retry(self._fetch_dataframe)(QUERY)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 1) # verify the number of rows
exp_columns = ['string_col', 'record_col']
self.assertEqual(list(df), exp_columns) # verify the column names
row = df.iloc[0]
# verify the row content
self.assertEqual(row['string_col'], 'Some value')
self.assertEqual(row['record_col'], record)
# verify that nested data can be accessed with indices/keys
self.assertEqual(row['record_col']['nested_repeated'][0], 0)
self.assertEqual(
row['record_col']['nested_record']['nested_nested_string'],
'some deep insight')

def temp_dataset(self, dataset_id):
dataset = retry_403(Config.CLIENT.create_dataset)(
Dataset(Config.CLIENT.dataset(dataset_id)))
Expand Down
39 changes: 39 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

from six.moves import http_client
import unittest
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

from google.cloud.bigquery.job import ExtractJobConfig, CopyJobConfig
from google.cloud.bigquery.job import LoadJobConfig
Expand Down Expand Up @@ -2720,6 +2724,41 @@ def test_reload_w_alternate_client(self):
self.assertEqual(req['path'], PATH)
self._verifyResourceProperties(job, RESOURCE)

@unittest.skipIf(pandas is None, 'Requires `pandas`')
def test_to_dataframe(self):
begun_resource = self._make_resource()
query_resource = {
'jobComplete': True,
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_ID,
},
'schema': {
'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'},
],
},
'rows': [
{'f': [{'v': 'Phred Phlyntstone'}, {'v': '32'}]},
{'f': [{'v': 'Bharney Rhubble'}, {'v': '33'}]},
{'f': [{'v': 'Wylma Phlyntstone'}, {'v': '29'}]},
{'f': [{'v': 'Bhettye Rhubble'}, {'v': '27'}]},
],
}
done_resource = copy.deepcopy(begun_resource)
done_resource['status'] = {'state': 'DONE'}
connection = _Connection(
begun_resource, query_resource, done_resource, query_resource)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)

df = job.to_dataframe()

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ['name', 'age']) # verify the column names

def test_iter(self):
import types

Expand Down
Loading

0 comments on commit 3511e87

Please sign in to comment.