Skip to content

Commit

Permalink
bigquery: add copy functionality (#127)
Browse files Browse the repository at this point in the history
This commit adds a `copy` method to the
`parsons.google.google_bigquery.GoogleBigQuery` connector. The
`copy` method can be used to load a Parsons table into a BigQuery
table by uploading the file to Google Cloud Storage.
  • Loading branch information
Eliot Stone authored Dec 20, 2019
1 parent b376340 commit 3e65128
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 11 deletions.
116 changes: 114 additions & 2 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from google.cloud.bigquery import Client
from google.cloud import bigquery
from google.cloud import exceptions
from parsons import Table
from parsons.google.utitities import setup_google_application_credentials
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file
import petl
import pickle
import uuid


class GoogleBigQuery:
Expand Down Expand Up @@ -34,6 +38,8 @@ class GoogleBigQuery:
"""

def __init__(self, app_creds=None, project=None, location=None):
self.app_creds = app_creds

setup_google_application_credentials(app_creds)

self.project = project
Expand All @@ -44,6 +50,90 @@ def __init__(self, app_creds=None, project=None, location=None):
# This attribute will be used to hold the client once we have created it.
self._client = None

def copy(self, table_obj, dataset_name, table_name, if_exists='fail',
tmp_gcs_bucket=None, gcs_client=None, job_config=None, **load_kwargs):
"""
Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage.
`Args:`
table_obj: obj
The Parsons Table to copy into BigQuery.
dataset_name: str
The dataset name to load the data into.
table_name: str
The table name to load the data into.
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table.
temp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
gcs_client: object
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
job_config: object
A LoadJobConfig object to provide to the underlying call to load_table_from_uri
on the BigQuery client. The function will create its own if not provided.
**load_kwargs: kwargs
Arguments to pass to the underlying load_table_from_uri call on the BigQuery
client.
"""
tmp_gcs_bucket = check_env.check('GCS_TEMP_BUCKET', tmp_gcs_bucket)

if if_exists not in ['fail', 'truncate', 'append', 'drop']:
raise ValueError(f'Unexpected value for if_exists: {if_exists}, must be one of '
'"append", "drop", "truncate", or "fail"')

table_exists = self.table_exists(dataset_name, table_name)

if not job_config:
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True

job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED

dataset_ref = self.client.dataset(dataset_name)

if table_exists:
if if_exists == 'fail':
raise ValueError('Table already exists.')
elif if_exists == 'drop':
self.delete_table(dataset_name, table_name)
elif if_exists == 'append':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
elif if_exists == 'truncate':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

gcs_client = gcs_client or GoogleCloudStorage()
temp_blob_name = f'{uuid.uuid4()}.csv'
temp_blob_uri = gcs_client.upload_table(table_obj, tmp_gcs_bucket, temp_blob_name)

# load CSV from Cloud Storage into BigQuery
try:
load_job = self.client.load_table_from_uri(
temp_blob_uri, dataset_ref.table(table_name),
job_config=job_config, **load_kwargs,
)
load_job.result()
finally:
gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)

def delete_table(self, dataset_name, table_name):
"""
Delete a BigQuery table.
`Args:`
dataset_name: str
The name of the dataset that the table lives in.
table_name: str
The name of the table to delete.
"""
dataset_ref = self.client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)
self.client.delete_table(table_ref)

def query(self, sql):
"""
Run a BigQuery query and return the results as a Parsons table.
Expand Down Expand Up @@ -87,6 +177,28 @@ def query(self, sql):

return final_table

def table_exists(self, dataset_name, table_name):
"""
Check whether or not the Google BigQuery table exists in the specified dataset.
`Args:`
dataset_name: str
The name of the BigQuery dataset to check in
table_name: str
The name of the BigQuery table to check for
`Returns:`
bool
True if the table exists in the specified dataset, false otherwise
"""
dataset = self.client.dataset(dataset_name)
table_ref = dataset.table(table_name)
try:
self.client.get_table(table_ref)
except exceptions.NotFound:
return False

return True

@property
def client(self):
"""
Expand All @@ -97,6 +209,6 @@ def client(self):
"""
if not self._client:
# Create a BigQuery client to use to make the query
self._client = Client(project=self.project, location=self.location)
self._client = bigquery.Client(project=self.project, location=self.location)

return self._client
34 changes: 34 additions & 0 deletions parsons/google/google_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,37 @@ def delete_blob(self, bucket_name, blob_name):
blob = self.get_blob(bucket_name, blob_name)
blob.delete()
logger.info(f'{blob_name} blob in {bucket_name} bucket deleted.')

def upload_table(self, table, bucket_name, blob_name, data_type='csv', default_acl=None):
"""
Load the data from a Parsons table into a blob.
`Args:`
table: obj
A :ref:`parsons-table`
bucket_name: str
The name of the bucket to upload the data into.
blob_name: str
The name of the blob to upload the data into.
data_type: str
The file format to use when writing the data. One of: `csv` or `json`
"""
bucket = storage.Bucket(self.client, name=bucket_name)
blob = storage.Blob(blob_name, bucket)

if data_type == 'csv':
local_file = table.to_csv()
content_type = 'text/csv'
elif data_type == 'json':
local_file = table.to_json()
content_type = 'application/json'
else:
raise ValueError(f'Unknown data_type value ({data_type}): must be one of: csv or json')

try:
blob.upload_from_filename(local_file, content_type=content_type, client=self.client,
predefined_acl=default_acl)
finally:
files.close_temp_file(local_file)

return f'gs://{bucket_name}/{blob_name}'
146 changes: 137 additions & 9 deletions test/test_google/test_google_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import unittest
import unittest.mock as mock
from google.cloud import bigquery
from google.cloud import exceptions
from parsons.google.google_bigquery import GoogleBigQuery
from parsons import Table


# Test class to fake the RowIterator interface for BigQuery job results
Expand All @@ -15,14 +18,15 @@ def __iter__(self):


class TestGoogleBigQuery(unittest.TestCase):
def test_query(self):
def setUp(self):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'foo'
self.tmp_gcs_bucket = 'tmp'

def test_query(self):
query_string = 'select * from table'

# Pass the mock class into our GoogleBigQuery constructor
bq = GoogleBigQuery()
bq._client = self._build_mock_client([{'one': 1, 'two': 2}])
bq = self._build_mock_client_for_querying([{'one': 1, 'two': 2}])

# Run a query against our parsons GoogleBigQuery class
result = bq.query(query_string)
Expand All @@ -33,21 +37,123 @@ def test_query(self):
self.assertEqual(result[0], {'one': 1, 'two': 2})

def test_query__no_results(self):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'foo'

query_string = 'select * from table'

# Pass the mock class into our GoogleBigQuery constructor
bq = GoogleBigQuery()
bq._client = self._build_mock_client([])
bq = self._build_mock_client_for_querying([])

# Run a query against our parsons GoogleBigQuery class
result = bq.query(query_string)

# Check our return value
self.assertEqual(result, None)

def _build_mock_client(self, results):
def test_copy(self):
# setup dependencies / inputs
tmp_blob_uri = 'gs://tmp/file'

# set up object under test
gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri)
tbl = self.default_table
bq = self._build_mock_client_for_copying(table_exists=False)

# call the method being tested
bq.copy(tbl, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
gcs_client=gcs_client)

# check that the method did the right things
self.assertEqual(gcs_client.upload_table.call_count, 1)
upload_call_args = gcs_client.upload_table.call_args
self.assertEqual(upload_call_args[0][0], tbl)
self.assertEqual(upload_call_args[0][1], self.tmp_gcs_bucket)
tmp_blob_name = upload_call_args[0][2]

self.assertEqual(bq.client.load_table_from_uri.call_count, 1)
load_call_args = bq.client.load_table_from_uri.call_args
self.assertEqual(load_call_args[0][0], tmp_blob_uri)

job_config = load_call_args[1]['job_config']
self.assertEqual(job_config.write_disposition,
bigquery.WriteDisposition.WRITE_EMPTY)

# make sure we cleaned up the temp file
self.assertEqual(gcs_client.delete_blob.call_count, 1)
delete_call_args = gcs_client.delete_blob.call_args
self.assertEqual(delete_call_args[0][0], self.tmp_gcs_bucket)
self.assertEqual(delete_call_args[0][1], tmp_blob_name)

def test_copy__if_exists_truncate(self):
gcs_client = self._build_mock_cloud_storage_client()
# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
if_exists='truncate', gcs_client=gcs_client)

# check that the method did the right things
call_args = bq.client.load_table_from_uri.call_args
job_config = call_args[1]['job_config']
self.assertEqual(job_config.write_disposition,
bigquery.WriteDisposition.WRITE_TRUNCATE)

# make sure we cleaned up the temp file
self.assertEqual(gcs_client.delete_blob.call_count, 1)

def test_copy__if_exists_append(self):
gcs_client = self._build_mock_cloud_storage_client()
# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
if_exists='append', gcs_client=gcs_client)

# check that the method did the right things
call_args = bq.client.load_table_from_uri.call_args
job_config = call_args[1]['job_config']
self.assertEqual(job_config.write_disposition,
bigquery.WriteDisposition.WRITE_APPEND)

# make sure we cleaned up the temp file
self.assertEqual(gcs_client.delete_blob.call_count, 1)

def test_copy__if_exists_fail(self):
# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
with self.assertRaises(Exception):
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
gcs_client=self._build_mock_cloud_storage_client())

def test_copy__if_exists_drop(self):
gcs_client = self._build_mock_cloud_storage_client()
# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
if_exists='drop', gcs_client=gcs_client)

# check that we tried to delete the table
self.assertEqual(bq.client.delete_table.call_count, 1)

# make sure we cleaned up the temp file
self.assertEqual(gcs_client.delete_blob.call_count, 1)

def test_copy__bad_if_exists(self):
gcs_client = self._build_mock_cloud_storage_client()

# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
with self.assertRaises(ValueError):
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
if_exists='foo', gcs_client=gcs_client)

def _build_mock_client_for_querying(self, results):
# Create a mock that will play the role of the query job
query_job = mock.MagicMock()
query_job.result.return_value = FakeResults(results)
Expand All @@ -56,4 +162,26 @@ def _build_mock_client(self, results):
client = mock.MagicMock()
client.query.return_value = query_job

return client
bq = GoogleBigQuery()
bq._client = client
return bq

def _build_mock_client_for_copying(self, table_exists=True):
bq_client = mock.MagicMock()
if not table_exists:
bq_client.get_table.side_effect = exceptions.NotFound('not found')
bq = GoogleBigQuery()
bq._client = bq_client
return bq

def _build_mock_cloud_storage_client(self, tmp_blob_uri=''):
gcs_client = mock.MagicMock()
gcs_client.upload_table.return_value = tmp_blob_uri
return gcs_client

@property
def default_table(self):
return Table([
{'num': 1, 'ltr': 'a'},
{'num': 2, 'ltr': 'b'},
])

0 comments on commit 3e65128

Please sign in to comment.