Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigquery: add copy functionality #127

Merged
merged 3 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 114 additions & 2 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from google.cloud.bigquery import Client
from google.cloud import bigquery
from google.cloud import exceptions
from parsons import Table
from parsons.google.utitities import setup_google_application_credentials
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file
import petl
import pickle
import uuid


class GoogleBigQuery:
Expand Down Expand Up @@ -34,6 +38,8 @@ class GoogleBigQuery:
"""

def __init__(self, app_creds=None, project=None, location=None):
self.app_creds = app_creds

setup_google_application_credentials(app_creds)

self.project = project
Expand All @@ -44,6 +50,90 @@ def __init__(self, app_creds=None, project=None, location=None):
# This attribute will be used to hold the client once we have created it.
self._client = None

def copy(self, table_obj, dataset_name, table_name, if_exists='fail',
tmp_gcs_bucket=None, gcs_client=None, job_config=None, **load_kwargs):
"""
Copy a :ref:`parsons-table` into Google BigQuery via Google Cloud Storage.

`Args:`
table_obj: obj
The Parsons Table to copy into BigQuery.
dataset_name: str
The dataset name to load the data into.
table_name: str
The table name to load the data into.
if_exists: str
If the table already exists, either ``fail``, ``append``, ``drop``
or ``truncate`` the table.
temp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
gcs_client: object
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
job_config: object
A LoadJobConfig object to provide to the underlying call to load_table_from_uri
on the BigQuery client. The function will create its own if not provided.
**load_kwargs: kwargs
Arguments to pass to the underlying load_table_from_uri call on the BigQuery
client.
"""
tmp_gcs_bucket = check_env.check('GCS_TEMP_BUCKET', tmp_gcs_bucket)

if if_exists not in ['fail', 'truncate', 'append', 'drop']:
raise ValueError(f'Unexpected value for if_exists: {if_exists}, must be one of '
'"append", "drop", "truncate", or "fail"')

table_exists = self.table_exists(dataset_name, table_name)

if not job_config:
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True

job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED

dataset_ref = self.client.dataset(dataset_name)

if table_exists:
if if_exists == 'fail':
raise ValueError('Table already exists.')
elif if_exists == 'drop':
self.delete_table(dataset_name, table_name)
elif if_exists == 'append':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
elif if_exists == 'truncate':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

gcs_client = gcs_client or GoogleCloudStorage()
temp_blob_name = f'{uuid.uuid4()}.csv'
temp_blob_uri = gcs_client.upload_table(table_obj, tmp_gcs_bucket, temp_blob_name)

# load CSV from Cloud Storage into BigQuery
try:
load_job = self.client.load_table_from_uri(
temp_blob_uri, dataset_ref.table(table_name),
job_config=job_config, **load_kwargs,
)
load_job.result()
finally:
gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name)

def delete_table(self, dataset_name, table_name):
"""
Delete a BigQuery table.

`Args:`
dataset_name: str
The name of the dataset that the table lives in.
table_name: str
The name of the table to delete.
"""
dataset_ref = self.client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)
self.client.delete_table(table_ref)

def query(self, sql):
"""
Run a BigQuery query and return the results as a Parsons table.
Expand Down Expand Up @@ -87,6 +177,28 @@ def query(self, sql):

return final_table

def table_exists(self, dataset_name, table_name):
"""
Check whether or not the Google BigQuery table exists in the specified dataset.

`Args:`
dataset_name: str
The name of the BigQuery dataset to check in
table_name: str
The name of the BigQuery table to check for
`Returns:`
bool
True if the table exists in the specified dataset, false otherwise
"""
dataset = self.client.dataset(dataset_name)
table_ref = dataset.table(table_name)
try:
self.client.get_table(table_ref)
except exceptions.NotFound:
return False

return True

@property
def client(self):
"""
Expand All @@ -97,6 +209,6 @@ def client(self):
"""
if not self._client:
# Create a BigQuery client to use to make the query
self._client = Client(project=self.project, location=self.location)
self._client = bigquery.Client(project=self.project, location=self.location)

return self._client
34 changes: 34 additions & 0 deletions parsons/google/google_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,37 @@ def delete_blob(self, bucket_name, blob_name):
blob = self.get_blob(bucket_name, blob_name)
blob.delete()
logger.info(f'{blob_name} blob in {bucket_name} bucket deleted.')

def upload_table(self, table, bucket_name, blob_name, data_type='csv', default_acl=None):
"""
Load the data from a Parsons table into a blob.

`Args:`
table: obj
A :ref:`parsons-table`
bucket_name: str
The name of the bucket to upload the data into.
blob_name: str
The name of the blob to upload the data into.
data_type: str
The file format to use when writing the data. One of: `csv` or `json`
"""
bucket = storage.Bucket(self.client, name=bucket_name)
blob = storage.Blob(blob_name, bucket)

if data_type == 'csv':
local_file = table.to_csv()
content_type = 'text/csv'
elif data_type == 'json':
local_file = table.to_json()
content_type = 'application/json'
else:
raise ValueError(f'Unknown data_type value ({data_type}): must be one of: csv or json')

try:
blob.upload_from_filename(local_file, content_type=content_type, client=self.client,
predefined_acl=default_acl)
finally:
files.close_temp_file(local_file)

return f'gs://{bucket_name}/{blob_name}'
146 changes: 137 additions & 9 deletions test/test_google/test_google_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import unittest
import unittest.mock as mock
from google.cloud import bigquery
from google.cloud import exceptions
from parsons.google.google_bigquery import GoogleBigQuery
from parsons import Table


# Test class to fake the RowIterator interface for BigQuery job results
Expand All @@ -15,14 +18,15 @@ def __iter__(self):


class TestGoogleBigQuery(unittest.TestCase):
def test_query(self):
def setUp(self):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'foo'
self.tmp_gcs_bucket = 'tmp'

def test_query(self):
query_string = 'select * from table'

# Pass the mock class into our GoogleBigQuery constructor
bq = GoogleBigQuery()
bq._client = self._build_mock_client([{'one': 1, 'two': 2}])
bq = self._build_mock_client_for_querying([{'one': 1, 'two': 2}])

# Run a query against our parsons GoogleBigQuery class
result = bq.query(query_string)
Expand All @@ -33,21 +37,123 @@ def test_query(self):
self.assertEqual(result[0], {'one': 1, 'two': 2})

def test_query__no_results(self):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'foo'

query_string = 'select * from table'

# Pass the mock class into our GoogleBigQuery constructor
bq = GoogleBigQuery()
bq._client = self._build_mock_client([])
bq = self._build_mock_client_for_querying([])

# Run a query against our parsons GoogleBigQuery class
result = bq.query(query_string)

# Check our return value
self.assertEqual(result, None)

def _build_mock_client(self, results):
def test_copy(self):
# setup dependencies / inputs
tmp_blob_uri = 'gs://tmp/file'

# set up object under test
gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri)
tbl = self.default_table
bq = self._build_mock_client_for_copying(table_exists=False)

# call the method being tested
bq.copy(tbl, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
gcs_client=gcs_client)

# check that the method did the right things
self.assertEqual(gcs_client.upload_table.call_count, 1)
upload_call_args = gcs_client.upload_table.call_args
self.assertEqual(upload_call_args[0][0], tbl)
self.assertEqual(upload_call_args[0][1], self.tmp_gcs_bucket)
tmp_blob_name = upload_call_args[0][2]

self.assertEqual(bq.client.load_table_from_uri.call_count, 1)
load_call_args = bq.client.load_table_from_uri.call_args
self.assertEqual(load_call_args[0][0], tmp_blob_uri)

job_config = load_call_args[1]['job_config']
self.assertEqual(job_config.write_disposition,
bigquery.WriteDisposition.WRITE_EMPTY)

# make sure we cleaned up the temp file
self.assertEqual(gcs_client.delete_blob.call_count, 1)
delete_call_args = gcs_client.delete_blob.call_args
self.assertEqual(delete_call_args[0][0], self.tmp_gcs_bucket)
self.assertEqual(delete_call_args[0][1], tmp_blob_name)

def test_copy__if_exists_truncate(self):
gcs_client = self._build_mock_cloud_storage_client()
# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
if_exists='truncate', gcs_client=gcs_client)

# check that the method did the right things
call_args = bq.client.load_table_from_uri.call_args
job_config = call_args[1]['job_config']
self.assertEqual(job_config.write_disposition,
bigquery.WriteDisposition.WRITE_TRUNCATE)

# make sure we cleaned up the temp file
self.assertEqual(gcs_client.delete_blob.call_count, 1)

def test_copy__if_exists_append(self):
gcs_client = self._build_mock_cloud_storage_client()
# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
if_exists='append', gcs_client=gcs_client)

# check that the method did the right things
call_args = bq.client.load_table_from_uri.call_args
job_config = call_args[1]['job_config']
self.assertEqual(job_config.write_disposition,
bigquery.WriteDisposition.WRITE_APPEND)

# make sure we cleaned up the temp file
self.assertEqual(gcs_client.delete_blob.call_count, 1)

def test_copy__if_exists_fail(self):
# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
with self.assertRaises(Exception):
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
gcs_client=self._build_mock_cloud_storage_client())

def test_copy__if_exists_drop(self):
gcs_client = self._build_mock_cloud_storage_client()
# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
if_exists='drop', gcs_client=gcs_client)

# check that we tried to delete the table
self.assertEqual(bq.client.delete_table.call_count, 1)

# make sure we cleaned up the temp file
self.assertEqual(gcs_client.delete_blob.call_count, 1)

def test_copy__bad_if_exists(self):
gcs_client = self._build_mock_cloud_storage_client()

# set up object under test
bq = self._build_mock_client_for_copying()

# call the method being tested
with self.assertRaises(ValueError):
bq.copy(self.default_table, 'dataset', 'table', tmp_gcs_bucket=self.tmp_gcs_bucket,
if_exists='foo', gcs_client=gcs_client)

def _build_mock_client_for_querying(self, results):
# Create a mock that will play the role of the query job
query_job = mock.MagicMock()
query_job.result.return_value = FakeResults(results)
Expand All @@ -56,4 +162,26 @@ def _build_mock_client(self, results):
client = mock.MagicMock()
client.query.return_value = query_job

return client
bq = GoogleBigQuery()
bq._client = client
return bq

def _build_mock_client_for_copying(self, table_exists=True):
bq_client = mock.MagicMock()
if not table_exists:
bq_client.get_table.side_effect = exceptions.NotFound('not found')
bq = GoogleBigQuery()
bq._client = bq_client
return bq

def _build_mock_cloud_storage_client(self, tmp_blob_uri=''):
gcs_client = mock.MagicMock()
gcs_client.upload_table.return_value = tmp_blob_uri
return gcs_client

@property
def default_table(self):
return Table([
{'num': 1, 'ltr': 'a'},
{'num': 2, 'ltr': 'b'},
])