Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve GCP Test Failures For Major Release #948

Merged
merged 8 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions docs/google.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Google Cloud projects.
Quickstart
==========

To instantiate the GoogleBigQuery class, you can pass the constructor a string containing either the name of the Google service account credentials file or a JSON string encoding those credentials. Alternatively, you can set the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` to be either of those strings and call the constructor without that argument.
To instantiate the `GoogleBigQuery` class, you can pass the constructor a string containing either the name of the Google service account credentials file or a JSON string encoding those credentials. Alternatively, you can set the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` to be either of those strings and call the constructor without that argument.

.. code-block:: python

Expand All @@ -78,16 +78,18 @@ To instantiate the GoogleBigQuery class, you can pass the constructor a string c
# be the file name or a JSON encoding of the credentials.
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'google_credentials_file.json'

big_query = GoogleBigQuery()
bigquery = GoogleBigQuery()

Alternatively, you can pass the credentials in as an argument. In the example below, we also specify the project.

.. code-block:: python

# Project in which we're working
project = 'parsons-test'
big_query = GoogleBigQuery(app_creds='google_credentials_file.json',
project=project)
bigquery = GoogleBigQuery(
app_creds='google_credentials_file.json',
project=project
)

We can now upload/query data.

Expand All @@ -107,23 +109,23 @@ We can now upload/query data.
gcs_temp_bucket = 'parsons_bucket'

# Create dataset if it doesn't already exist
big_query.client.create_dataset(dataset=dataset, exists_ok=True)
bigquery.client.create_dataset(dataset=dataset, exists_ok=True)

parsons_table = Table([{'name':'Bob', 'party':'D'},
{'name':'Jane', 'party':'D'},
{'name':'Sue', 'party':'R'},
{'name':'Bill', 'party':'I'}])

# Copy table in to create new BigQuery table
big_query.copy(table_obj=parsons_table,
bigquery.copy(table_obj=parsons_table,
table_name=table_name,
tmp_gcs_bucket=gcs_temp_bucket)

# Select from project.dataset.table
big_query.query(f'select name from {table_name} where party = "D"')
bigquery.query(f'select name from {table_name} where party = "D"')

# Delete the table when we're done
big_query.client.delete_table(table=table_name)
bigquery.client.delete_table(table=table_name)

===
API
Expand Down
2 changes: 1 addition & 1 deletion parsons/databases/discover_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from parsons.databases.redshift import Redshift
from parsons.databases.mysql import MySQL
from parsons.databases.postgres import Postgres
from parsons import GoogleBigQuery as BigQuery
from parsons.google.google_bigquery import GoogleBigQuery as BigQuery


def discover_database(
Expand Down
2 changes: 1 addition & 1 deletion parsons/etl/tofrom.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ def to_bigquery(
project: str
The project which the client is acting on behalf of. If not passed
then will use the default inferred environment.
\**kwargs: kwargs
**kwargs: kwargs
Additional keyword arguments passed into the `.copy()` function (`if_exists`,
`max_errors`, etc.)

Expand Down
64 changes: 37 additions & 27 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from parsons.google.utitities import setup_google_application_credentials
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file, is_gzip_path
from parsons.utilities.files import create_temp_file

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,7 +109,7 @@ def map_column_headers_to_schema_field(schema_definition: list) -> list:
"""

# TODO - Better way to test for this
if type(schema_definition[0]) == bigquery.SchemaField:
if isinstance(schema_definition[0], bigquery.SchemaField):
logger.debug("User supplied list of SchemaField objects")
return schema_definition

Expand Down Expand Up @@ -181,8 +181,10 @@ def connection(self):
The connection is set up as a python "context manager", so it will be closed
automatically when the connection goes out of scope. Note that the BigQuery
API uses jobs to run database operations and, as such, simply has a no-op for
a "commit" function. If you would like to manage transactions, please use
multi-statement queries as [outlined here](https://cloud.google.com/bigquery/docs/transactions)
a "commit" function.

If you would like to manage transactions, please use multi-statement queries
as [outlined here](https://cloud.google.com/bigquery/docs/transactions)
or utilize the `query_with_transaction` method on this class.

When using the connection, make sure to put it in a ``with`` block (necessary for
Expand Down Expand Up @@ -276,9 +278,10 @@ def query_with_connection(
if not commit:
raise ValueError(
"""
BigQuery implementation uses an API client which always auto-commits. If you wish to wrap
multiple queries in a transaction, use Mulit-Statement transactions within a single query
as outlined here: https://cloud.google.com/bigquery/docs/transactions or use the
BigQuery implementation uses an API client which always auto-commits.
If you wish to wrap multiple queries in a transaction, use
Mulit-Statement transactions within a single query as outlined
here: https://cloud.google.com/bigquery/docs/transactions or use the
`query_with_transaction` method on the BigQuery connector.
"""
)
Expand All @@ -301,9 +304,7 @@ def query_with_transaction(self, queries, parameters=None):
queries_wrapped = f"""
BEGIN
BEGIN TRANSACTION;

{queries_on_newlines}

COMMIT TRANSACTION;
END;
"""
Expand Down Expand Up @@ -353,8 +354,8 @@ def copy_from_gcs(
nullas: str
Loads fields that match null_string as NULL, where null_string can be any string
allow_quoted_newlines: bool
If True, detects quoted new line characters within a CSV field and does not interpret
the quoted new line character as a row boundary
If True, detects quoted new line characters within a CSV field and does
not interpret the quoted new line character as a row boundary
allow_jagged_rows: bool
Allow missing trailing optional columns (CSV only).
quote: str
Expand All @@ -377,12 +378,14 @@ def copy_from_gcs(
force_unzip_blobs: bool
If True, target blobs will be unzipped before being loaded to BigQuery.
compression_type: str
Accepts `zip` or `gzip` values to differentially unzip a compressed blob in cloud storage.
Accepts `zip` or `gzip` values to differentially unzip a compressed
blob in cloud storage.
new_file_extension: str
Provides a file extension if a blob is decompressed and rewritten to cloud storage.
Provides a file extension if a blob is decompressed and rewritten
to cloud storage.
**load_kwargs: kwargs
Other arguments to pass to the underlying load_table_from_uri call on the BigQuery
client.
Other arguments to pass to the underlying load_table_from_uri
call on the BigQuery client.
"""
if if_exists not in ["fail", "truncate", "append", "drop"]:
raise ValueError(
Expand Down Expand Up @@ -444,8 +447,10 @@ def copy_from_gcs(
except exceptions.BadRequest as e:
if "one of the files is larger than the maximum allowed size." in str(e):
logger.debug(
f"{gcs_blob_uri.split('/')[-1]} exceeds max size ... running decompression function..."
f"{gcs_blob_uri.split('/')[-1]} exceeds max size ... \
running decompression function..."
)

self.copy_large_compressed_file_from_gcs(
gcs_blob_uri=gcs_blob_uri,
table_name=table_name,
Expand Down Expand Up @@ -522,8 +527,8 @@ def copy_large_compressed_file_from_gcs(
nullas: str
Loads fields that match null_string as NULL, where null_string can be any string
allow_quoted_newlines: bool
If True, detects quoted new line characters within a CSV field and does not interpret
the quoted new line character as a row boundary
If True, detects quoted new line characters within a CSV field
and does not interpret the quoted new line character as a row boundary
allow_jagged_rows: bool
Allow missing trailing optional columns (CSV only).
quote: str
Expand All @@ -544,7 +549,8 @@ def copy_large_compressed_file_from_gcs(
if there are any conflicts between the job_config and other parameters, the
job_config values are preferred.
compression_type: str
Accepts `zip` or `gzip` values to differentially unzip a compressed blob in cloud storage.
Accepts `zip` or `gzip` values to differentially unzip a compressed
blob in cloud storage.
new_file_extension: str
Provides a file extension if a blob is decompressed and rewritten to cloud storage.
**load_kwargs: kwargs
Expand Down Expand Up @@ -788,7 +794,8 @@ def duplicate_table(
destination_table: str
Name of destination schema and table (e.g. ``myschema.newtable``)
if_exists: str
If the table already exists, either ``fail``, ``replace``, or ``ignore`` the operation.
If the table already exists, either ``fail``, ``replace``, or
``ignore`` the operation.
drop_source_table: boolean
Drop the source table
"""
Expand All @@ -797,8 +804,11 @@ def duplicate_table(
if if_exists == "fail" and self.table_exists(destination_table):
raise ValueError("Table already exists.")

table__replace_clause = "OR REPLACE " if if_exists == "replace" else ""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to tweaking this - doing it in one line was too long for flake8 and this seemed a little cleaner

table__exists_clause = " IF NOT EXISTS" if if_exists == "ignore" else ""

query = f"""
CREATE {'OR REPLACE ' if if_exists == 'replace' else ''}TABLE{' IF NOT EXISTS' if if_exists == 'ignore' else ''}
CREATE {table__replace_clause}TABLE{table__exists_clause}
{destination_table}
CLONE {source_table}
"""
Expand Down Expand Up @@ -833,8 +843,8 @@ def upsert(
A temp table is dropped by default on cleanup. You can set to False for debugging.
from_s3: boolean
Instead of specifying a table_obj (set the first argument to None),
set this to True and include :func:`~parsons.databases.bigquery.Bigquery.copy_s3` arguments
to upsert a pre-existing s3 file into the target_table
set this to True and include :func:`~parsons.databases.bigquery.Bigquery.copy_s3`
arguments to upsert a pre-existing s3 file into the target_table
\**copy_args: kwargs
See :func:`~parsons.databases.bigquery.BigQuery.copy` for options.
""" # noqa: W605
Expand Down Expand Up @@ -988,7 +998,7 @@ def get_views(self, schema, view: Optional[str] = None):

logger.debug("Retrieving views info.")
sql = f"""
select
select
table_schema as schema_name,
table_name as view_name,
view_definition
Expand Down Expand Up @@ -1016,10 +1026,10 @@ def get_columns(self, schema: str, table_name: str):
"""

base_query = f"""
SELECT
*
SELECT
*
FROM `{self.project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
WHERE
WHERE
table_name = '{table_name}'
"""

Expand Down
3 changes: 2 additions & 1 deletion test/test_databases/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_query_with_transaction(self, create_temp_file_mock):
all(
[
text in keyword_args["sql"]
for text in queries + ["ROLLBACK", "BEGIN TRANSACTION", "COMMIT"]
for text in queries + ["BEGIN TRANSACTION", "COMMIT"]
]
)
)
Expand Down Expand Up @@ -279,6 +279,7 @@ def test_copy_large_compressed_file_from_gcs(
bucket_name="tmp",
blob_name="file.gzip",
new_file_extension="csv",
compression_type="gzip",
)
self.assertEqual(bq.client.load_table_from_uri.call_count, 1)
load_call_args = bq.client.load_table_from_uri.call_args
Expand Down