-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
992 bug parsons bigquery upsert is broken #994
992 bug parsons bigquery upsert is broken #994
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is definitely better but still doesn't feel all that clean. A few notes
@@ -42,13 +42,6 @@ | |||
QUERY_BATCH_SIZE = 100000 | |||
|
|||
|
|||
def get_table_ref(client, table_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to class. thought it might be helpful and made it easier to mock
dataset_ref = self.client.dataset(parsed["dataset"]) | ||
return dataset_ref.table(parsed["table"]) | ||
|
||
def _get_job_config_schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved all the schema logic, including the stuff Austin added a couple PRs ago, into this one function
# if job.schema already set in job_config, do nothing | ||
if job_config.schema: | ||
return job_config.schema | ||
# if schema specified by user, convert to schema type and use that | ||
if custom_schema: | ||
return map_column_headers_to_schema_field(custom_schema) | ||
# if template_table specified by user, use that | ||
# otherwise, if loading into existing table, infer destination table as template table | ||
if not template_table and if_exists in ("append", "truncate"): | ||
template_table = destination_table_name | ||
# if template_table set, use it to set the load schema | ||
if template_table: | ||
try: | ||
bigquery_table = self.client.get_table(template_table) | ||
return bigquery_table.schema | ||
except google.api_core.exceptions.NotFound: | ||
logger.warning( | ||
f"template_table '{template_table}' not found. Unable to set schema." | ||
) | ||
# if load is coming from a Parsons table, use that to generate schema | ||
if parsons_table: | ||
return self._generate_schema_from_parsons_table(parsons_table) | ||
|
||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic is probably the most important to review closely
except exceptions.BadRequest as e: | ||
for idx, error_ in enumerate(load_job.errors): | ||
if idx == 0: | ||
logger.error( | ||
"* Load job failed. Enumerating errors collection below:" | ||
) | ||
logger.error(f"** Error collection - index {idx}:") | ||
logger.error(error_) | ||
|
||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved this logic to loop through the error collections into the lower level function because I think we'll always want to do this. @IanRFerguson I know we had that weird edge case where some of this logic was throwing. Do you remember what that was? (should have made a unit test for it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that was when the Catalist flat file size exceeds Google's maximum compressed file size ... any of the state-level models are a good test case for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is working. I tested in on my simple script in the issue I posted, and that worked fine. I have a longer script that is failing, but I think that's because of a separate issue unrelated to upsert
specifically. I'm going to do a little more testing on that and, assuming it's unrelated, then I'll approve after these few small changes are made.
@@ -338,8 +338,9 @@ def test_copy(self): | |||
gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri) | |||
tbl = self.default_table | |||
bq = self._build_mock_client_for_copying(table_exists=False) | |||
bq.copy_from_gcs = mock.MagicMock() | |||
table_name = ("dataset.table",) | |||
bq._load_table_from_uri = mock.MagicMock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I'm not sure if these unit tests have any value. I'm a big fan of tests, and even TDD where it makes sense, but some code just doesn't have any testable surface area. (Outside of integration tests that do actually hit the external service.)
There's a few code smells here like mocking private methods, and asserting that private methods were called a certain number of times.
This might be too big of a change for this commit, but my vote would be that we just scrap these tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least, I think a better long term direction for these tests would be to mock out the client
object that the BQ connector holds on to, but not to mock the private methods of the BQ connector itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Again, I don't think that's worth changing in this PR, though.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an aside, this is an endemic problem for many parsons tests. There many tests that are essentially
Foo.bar = mock.MagicMock()
Foo.bar.return_value = 'fibble'
result = Foo.bar()
assert result == 'fibble'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, more than willing to rethink how we do tests, but don't let it get in the way of this particular PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I guess that Will should probably just leave this as-is in this PR. More of a philosophical point, then.
self.copy_from_gcs( | ||
gcs_blob_uri=temp_blob_uri, | ||
table_name=table_name, | ||
if_exists=if_exists, | ||
max_errors=max_errors, | ||
self._load_table_from_uri( | ||
source_uris=temp_blob_uri, | ||
destination=self.get_table_ref(table_name=table_name), | ||
job_config=job_config, | ||
**load_kwargs, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jason94 I think this might be the source of the bug you reported in slack. copy
used to call the copy_from_gcs
under the hood which meant it used the defaults set in the copy_from_gcs
function signature, which are different from the defaults of the underlying google api
I think our options are:
- add those same parameters and defaults to the
copy
function - modify the
copy
function to still callcopy_from_gcs
under the hood
I think I'd vote number 1. By calling the lower level _load_table_from_uri
function, it made it a little easier to manage the schema stuff but we could probably figure it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. # 1 seems fine to me. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, here's the script:
import dotenv
from parsons import Table
from parsons.google.google_bigquery import GoogleBigQuery
dotenv.load_dotenv(override=True)
def main():
bq = GoogleBigQuery()
data = Table([{"id": 1, "text": "Test\ntext"}])
bq.copy(data, "test.newlines", if_exists="drop")
if __name__ == "__main__":
main()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything looks good to me, except for the issue that all of the instances of the template_table
parameter in the google_bigquery.py
file need to be Optional[str]
not str
.
I've tested it against the upsert and newlines bug and it passes both! Once you fix the issue above, it looks good to me, so I'm going to go ahead and approve.
…ithub.com/move-coop/parsons into 992-bug-parsons-bigquery-upsert-is-broken
Fixes the upsert bug by adding "template_table" back into the API for the three copy methods (copy, copy_s3, copy_from_gcs).
Moved the schema determination logic into a single private function and moved some error handling into a lower level wrapper of the BQ client's
load_table_from_uri
. Also, did some basic tidying in a few functions.These are some of the most central methods in the class. Obviously the unit tests are passing, your test script, and I got halfway through our Catalist load locally. But if people can do some meaningful kicking the tires with integration tests in your projects, I think that would be a good use of time.