Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

992 bug parsons bigquery upsert is broken #994

Merged
merged 10 commits into from
Feb 27, 2024

Conversation

willyraedy
Copy link
Contributor

@willyraedy willyraedy commented Feb 19, 2024

Fixes the upsert bug by adding "template_table" back into the API for the three copy methods (copy, copy_s3, copy_from_gcs).

Moved the schema determination logic into a single private function and moved some error handling into a lower level wrapper of the BQ client's load_table_from_uri. Also, did some basic tidying in a few functions.

These are some of the most central methods in the class. Obviously the unit tests are passing, your test script, and I got halfway through our Catalist load locally. But if people can do some meaningful kicking the tires with integration tests in your projects, I think that would be a good use of time.

@willyraedy willyraedy linked an issue Feb 19, 2024 that may be closed by this pull request
@willyraedy willyraedy self-assigned this Feb 19, 2024
Copy link
Contributor Author

@willyraedy willyraedy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is definitely better but still doesn't feel all that clean. A few notes

@@ -42,13 +42,6 @@
QUERY_BATCH_SIZE = 100000


def get_table_ref(client, table_name):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to class. thought it might be helpful and made it easier to mock

parsons/google/google_bigquery.py Outdated Show resolved Hide resolved
dataset_ref = self.client.dataset(parsed["dataset"])
return dataset_ref.table(parsed["table"])

def _get_job_config_schema(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved all the schema logic, including the stuff Austin added a couple PRs ago, into this one function

Comment on lines +1127 to +1150
# if job.schema already set in job_config, do nothing
if job_config.schema:
return job_config.schema
# if schema specified by user, convert to schema type and use that
if custom_schema:
return map_column_headers_to_schema_field(custom_schema)
# if template_table specified by user, use that
# otherwise, if loading into existing table, infer destination table as template table
if not template_table and if_exists in ("append", "truncate"):
template_table = destination_table_name
# if template_table set, use it to set the load schema
if template_table:
try:
bigquery_table = self.client.get_table(template_table)
return bigquery_table.schema
except google.api_core.exceptions.NotFound:
logger.warning(
f"template_table '{template_table}' not found. Unable to set schema."
)
# if load is coming from a Parsons table, use that to generate schema
if parsons_table:
return self._generate_schema_from_parsons_table(parsons_table)

return None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is probably the most important to review closely

Comment on lines +1319 to +1328
except exceptions.BadRequest as e:
for idx, error_ in enumerate(load_job.errors):
if idx == 0:
logger.error(
"* Load job failed. Enumerating errors collection below:"
)
logger.error(f"** Error collection - index {idx}:")
logger.error(error_)

raise e
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this logic to loop through the error collections into the lower level function because I think we'll always want to do this. @IanRFerguson I know we had that weird edge case where some of this logic was throwing. Do you remember what that was? (should have made a unit test for it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that was when the Catalist flat file size exceeds Google's maximum compressed file size ... any of the state-level models are a good test case for that

@willyraedy willyraedy marked this pull request as ready for review February 19, 2024 23:06
Copy link
Collaborator

@Jason94 Jason94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is working. I tested in on my simple script in the issue I posted, and that worked fine. I have a longer script that is failing, but I think that's because of a separate issue unrelated to upsert specifically. I'm going to do a little more testing on that and, assuming it's unrelated, then I'll approve after these few small changes are made.

parsons/google/google_bigquery.py Outdated Show resolved Hide resolved
@@ -338,8 +338,9 @@ def test_copy(self):
gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri)
tbl = self.default_table
bq = self._build_mock_client_for_copying(table_exists=False)
bq.copy_from_gcs = mock.MagicMock()
table_name = ("dataset.table",)
bq._load_table_from_uri = mock.MagicMock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'm not sure if these unit tests have any value. I'm a big fan of tests, and even TDD where it makes sense, but some code just doesn't have any testable surface area. (Outside of integration tests that do actually hit the external service.)

There's a few code smells here like mocking private methods, and asserting that private methods were called a certain number of times.

This might be too big of a change for this commit, but my vote would be that we just scrap these tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least, I think a better long term direction for these tests would be to mock out the client object that the BQ connector holds on to, but not to mock the private methods of the BQ connector itself.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Again, I don't think that's worth changing in this PR, though.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, this is an endemic problem for many parsons tests. There many tests that are essentially

Foo.bar = mock.MagicMock()
Foo.bar.return_value = 'fibble'
result = Foo.bar()
assert result == 'fibble'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, more than willing to rethink how we do tests, but don't let it get in the way of this particular PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess that Will should probably just leave this as-is in this PR. More of a philosophical point, then.

Comment on lines -822 to 803
self.copy_from_gcs(
gcs_blob_uri=temp_blob_uri,
table_name=table_name,
if_exists=if_exists,
max_errors=max_errors,
self._load_table_from_uri(
source_uris=temp_blob_uri,
destination=self.get_table_ref(table_name=table_name),
job_config=job_config,
**load_kwargs,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jason94 I think this might be the source of the bug you reported in slack. copy used to call the copy_from_gcs under the hood which meant it used the defaults set in the copy_from_gcs function signature, which are different from the defaults of the underlying google api

I think our options are:

  1. add those same parameters and defaults to the copy function
  2. modify the copy function to still call copy_from_gcs under the hood

I think I'd vote number 1. By calling the lower level _load_table_from_uri function, it made it a little easier to manage the schema stuff but we could probably figure it out.

Copy link
Collaborator

@Jason94 Jason94 Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. # 1 seems fine to me. Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, here's the script:

import dotenv
from parsons import Table
from parsons.google.google_bigquery import GoogleBigQuery

dotenv.load_dotenv(override=True)


def main():
    bq = GoogleBigQuery()

    data = Table([{"id": 1, "text": "Test\ntext"}])

    bq.copy(data, "test.newlines", if_exists="drop")


if __name__ == "__main__":
    main()

Copy link
Collaborator

@Jason94 Jason94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks good to me, except for the issue that all of the instances of the template_table parameter in the google_bigquery.py file need to be Optional[str] not str.

I've tested it against the upsert and newlines bug and it passes both! Once you fix the issue above, it looks good to me, so I'm going to go ahead and approve.

@KasiaHinkson KasiaHinkson merged commit a9af0af into main Feb 27, 2024
10 checks passed
@KasiaHinkson KasiaHinkson deleted the 992-bug-parsons-bigquery-upsert-is-broken branch February 27, 2024 18:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Parsons' BigQuery Upsert is Broken
6 participants