From 0de0fe16dbc0f8a31982f48b448935fbe4ba29f8 Mon Sep 17 00:00:00 2001 From: Willy Raedy Date: Mon, 19 Feb 2024 14:56:48 -0600 Subject: [PATCH 1/8] add template table to all copy functions, refactor schema generation, clean up error handling --- parsons/google/google_bigquery.py | 252 ++++++++++++++++++------------ 1 file changed, 156 insertions(+), 96 deletions(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index c4002b7997..a84cbe6e76 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -349,6 +349,7 @@ def copy_from_gcs( force_unzip_blobs: bool = False, compression_type: str = "gzip", new_file_extension: str = "csv", + template_table: str = None, **load_kwargs, ): """ @@ -404,25 +405,17 @@ def copy_from_gcs( new_file_extension: str Provides a file extension if a blob is decompressed and rewritten to cloud storage. + template_table: str + Table name to be used as the load schema. Load operation wil use the same + columns and data types as the template table. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ - if if_exists not in ["fail", "truncate", "append", "drop"]: - raise ValueError( - f"Unexpected value for if_exists: {if_exists}, must be one of " - '"append", "drop", "truncate", or "fail"' - ) - if data_type not in ["csv", "json"]: - raise ValueError( - f"Only supports csv or json files [data_type = {data_type}]" - ) - - table_exists = self.table_exists(table_name) + self._validate_copy_inputs() job_config = self._process_job_config( job_config=job_config, - table_exists=table_exists, table_name=table_name, if_exists=if_exists, max_errors=max_errors, @@ -434,6 +427,7 @@ def copy_from_gcs( allow_jagged_rows=allow_jagged_rows, quote=quote, schema=schema, + template_table=template_table ) # load CSV from Cloud Storage into BigQuery @@ -441,7 +435,7 @@ def copy_from_gcs( try: if force_unzip_blobs: - load_job = self.copy_large_compressed_file_from_gcs( + return self.copy_large_compressed_file_from_gcs( gcs_blob_uri=gcs_blob_uri, table_name=table_name, if_exists=if_exists, @@ -457,15 +451,13 @@ def copy_from_gcs( compression_type=compression_type, new_file_extension=new_file_extension, ) - load_job.result() else: - load_job = self.client.load_table_from_uri( + return self._load_table_from_uri( source_uris=gcs_blob_uri, destination=table_ref, job_config=job_config, **load_kwargs, ) - load_job.result() except exceptions.BadRequest as e: if "one of the files is larger than the maximum allowed size." in str(e): logger.debug( @@ -473,7 +465,7 @@ def copy_from_gcs( running decompression function..." ) - load_job = self.copy_large_compressed_file_from_gcs( + return self.copy_large_compressed_file_from_gcs( gcs_blob_uri=gcs_blob_uri, table_name=table_name, if_exists=if_exists, @@ -493,15 +485,6 @@ def copy_from_gcs( logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file") return "Empty file" - elif "encountered too many errors, giving up" in str(e): - # TODO - Is this TOO verbose? - logger.error(f"Max errors exceeded for {gcs_blob_uri.split('/')[-1]}") - - for error_ in load_job.errors: - logger.error(error_) - - raise e - else: raise e @@ -522,6 +505,7 @@ def copy_large_compressed_file_from_gcs( job_config: Optional[LoadJobConfig] = None, compression_type: str = "gzip", new_file_extension: str = "csv", + template_table: str = None, **load_kwargs, ): """ @@ -575,26 +559,21 @@ def copy_large_compressed_file_from_gcs( blob in cloud storage. new_file_extension: str Provides a file extension if a blob is decompressed and rewritten to cloud storage. + template_table: str + Table name to be used as the load schema. Load operation wil use the same + columns and data types as the template table. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ - if if_exists not in ["fail", "truncate", "append", "drop"]: - raise ValueError( - f"Unexpected value for if_exists: {if_exists}, must be one of " - '"append", "drop", "truncate", or "fail"' - ) - if data_type not in ["csv", "json"]: - raise ValueError( - f"Only supports csv or json files [data_type = {data_type}]" - ) - - table_exists = self.table_exists(table_name) + self._validate_copy_inputs( + if_exists=if_exists, + data_type=data_type + ) job_config = self._process_job_config( job_config=job_config, - table_exists=table_exists, table_name=table_name, if_exists=if_exists, max_errors=max_errors, @@ -606,6 +585,7 @@ def copy_large_compressed_file_from_gcs( allow_jagged_rows=allow_jagged_rows, quote=quote, schema=schema, + template_table=template_table ) # TODO - See if this inheritance is happening in other places @@ -627,13 +607,12 @@ def copy_large_compressed_file_from_gcs( f"Loading uncompressed uri into BigQuery {uncompressed_gcs_uri}..." ) table_ref = get_table_ref(self.client, table_name) - load_job = self.client.load_table_from_uri( + return self._load_table_from_uri( source_uris=uncompressed_gcs_uri, destination=table_ref, job_config=job_config, **load_kwargs, ) - load_job.result() finally: if uncompressed_gcs_uri: @@ -643,8 +622,6 @@ def copy_large_compressed_file_from_gcs( gcs.delete_blob(new_bucket_name, new_blob_name) logger.debug("Successfully dropped uncompressed blob") - return load_job - def copy_s3( self, table_name, @@ -660,6 +637,7 @@ def copy_s3( aws_secret_access_key: Optional[str] = None, gcs_client: Optional[GoogleCloudStorage] = None, tmp_gcs_bucket: Optional[str] = None, + template_table: Optional[str] = None, job_config: Optional[LoadJobConfig] = None, **load_kwargs, ): @@ -698,6 +676,9 @@ def copy_s3( tmp_gcs_bucket: str The name of the Google Cloud Storage bucket to use to stage the data to load into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified. + template_table: str + Table name to be used as the load schema. Load operation wil use the same + columns and data types as the template table. job_config: object A LoadJobConfig object to provide to the underlying call to load_table_from_uri on the BigQuery client. The function will create its own if not provided. Note @@ -726,7 +707,7 @@ def copy_s3( # load CSV from Cloud Storage into BigQuery try: - self.copy_from_gcs( + return self.copy_from_gcs( gcs_blob_uri=temp_blob_uri, table_name=table_name, if_exists=if_exists, @@ -736,6 +717,7 @@ def copy_s3( ignoreheader=ignoreheader, nullas=nullas, job_config=job_config, + template_table=template_table, **load_kwargs, ) finally: @@ -750,6 +732,7 @@ def copy( tmp_gcs_bucket: Optional[str] = None, gcs_client: Optional[GoogleCloudStorage] = None, job_config: Optional[LoadJobConfig] = None, + template_table: Optional[str] = None, **load_kwargs, ): """ @@ -774,30 +757,26 @@ def copy( job_config: object A LoadJobConfig object to provide to the underlying call to load_table_from_uri on the BigQuery client. The function will create its own if not provided. + template_table: str + Table name to be used as the load schema. Load operation wil use the same + columns and data types as the template table. **load_kwargs: kwargs Arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ + data_type = 'csv' tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket) - if not job_config: - job_config = bigquery.LoadJobConfig() + self._validate_copy_inputs(if_exists=if_exists, data_type=data_type) - # It isn't ever actually necessary to generate the schema explicitly here - # BigQuery will attempt to autodetect the schema on its own - # When appending or truncating an existing table, we should not provide a schema here - # It introduces situations where provided schema can mismatch the actual schema - if not job_config.schema: - if if_exists in ("append", "truncate"): - # It is more robust to fetch the actual existing schema - # than it is to try and infer it based on provided data - try: - bigquery_table = self.client.get_table(table_name) - job_config.schema = bigquery_table.schema - except google.api_core.exceptions.NotFound: - job_config.schema = self._generate_schema_from_parsons_table(tbl) - else: - job_config.schema = self._generate_schema_from_parsons_table(tbl) + job_config = self._process_job_config( + job_config=job_config, + table_name=table_name, + if_exists=if_exists, + max_errors=max_errors, + data_type=data_type, + template_table=template_table + ) # Reorder schema to match table to ensure compatibility schema = [] @@ -814,18 +793,16 @@ def copy( job_config.schema = schema gcs_client = gcs_client or GoogleCloudStorage() - temp_blob_name = f"{uuid.uuid4()}.csv" + temp_blob_name = f"{uuid.uuid4()}.{data_type}" temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name) # load CSV from Cloud Storage into BigQuery try: - self.copy_from_gcs( - gcs_blob_uri=temp_blob_uri, - table_name=table_name, - if_exists=if_exists, - max_errors=max_errors, + self._load_table_from_uri( + source_uris=temp_blob_uri, + destination=get_table_ref(self.client, table_name), job_config=job_config, - **load_kwargs, + **load_kwargs ) finally: gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name) @@ -1136,6 +1113,38 @@ def get_row_count(self, schema: str, table_name: str) -> int: return result["row_count"][0] + def _get_job_config_schema( + self, + job_config: LoadJobConfig, + destination_table_name: str, + if_exists: str, + parsons_table: Optional[Table] = None, + custom_schema: Optional[list] = None, + template_table: Optional[str] = None + ) -> LoadJobConfig: + # if job.schema already set in job_config, do nothing + if job_config.schema: + return job_config.schema + # if schema specified by user, convert to schema type and use that + if custom_schema: + return map_column_headers_to_schema_field(custom_schema) + # if template_table specified by user, use that + # otherwise, if loading into existing table, infer destination table as template table + if not template_table and if_exists in ("append", "truncate"): + template_table = destination_table_name + # if template_table set, use it to set the load schema + if template_table: + try: + bigquery_table = self.client.get_table(template_table) + return bigquery_table.schema + except google.api_core.exceptions.NotFound: + logger.warning(f"template_table '{template_table}' not found. Unablet to set schema.") + # if load is coming from a Parsons table, use that to generate schema + if parsons_table: + return self._generate_schema_from_parsons_table(parsons_table) + + return None + def _generate_schema_from_parsons_table(self, tbl): """BigQuery schema generation based on contents of Parsons table. @@ -1170,7 +1179,21 @@ def _generate_schema_from_parsons_table(self, tbl): return fields def _process_job_config( - self, job_config: Optional[LoadJobConfig] = None, **kwargs + self, + destination_table_name: str, + if_exists: str, + max_errors: int, + data_type: str, + csv_delimiter: Optional[str] = ',', + ignoreheader: Optional[int] = 0, + nullas: Optional[str] = None, + allow_quoted_newlines: Optional[bool] = None, + allow_jagged_rows: Optional[bool] = None, + quote: Optional[str] = None, + job_config: Optional[LoadJobConfig] = None, + custom_schema: Optional[list] = None, + template_table: Optional[str] = None, + parsons_table: Optional[Table] = None ) -> LoadJobConfig: """ Internal function to neatly process a user-supplied job configuration object. @@ -1188,60 +1211,61 @@ def _process_job_config( if not job_config: job_config = bigquery.LoadJobConfig() + job_config.schema = self._get_job_config_schema( + job_config=job_config, + destination_table_name=destination_table_name, + if_exists=if_exists, + parsons_table=parsons_table, + custom_schema=custom_schema, + template_table=template_table + ) if not job_config.schema: - if kwargs["schema"]: - logger.debug("Using user-supplied schema definition...") - job_config.schema = map_column_headers_to_schema_field(kwargs["schema"]) - job_config.autodetect = False - else: - logger.debug("Autodetecting schema definition...") - job_config.autodetect = True + job_config.autodetect = True if not job_config.create_disposition: job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED if not job_config.max_bad_records: - job_config.max_bad_records = kwargs["max_errors"] + job_config.max_bad_records = max_errors - if not job_config.skip_leading_rows and kwargs["data_type"] == "csv": - job_config.skip_leading_rows = kwargs["ignoreheader"] + if not job_config.skip_leading_rows and data_type == "csv": + job_config.skip_leading_rows = ignoreheader if not job_config.source_format: job_config.source_format = ( bigquery.SourceFormat.CSV - if kwargs["data_type"] == "csv" + if data_type == "csv" else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON ) if not job_config.field_delimiter: - if kwargs["data_type"] == "csv": - job_config.field_delimiter = kwargs["csv_delimiter"] - if kwargs["nullas"]: - job_config.null_marker = kwargs["nullas"] + if data_type == "csv": + job_config.field_delimiter = csv_delimiter + if nullas: + job_config.null_marker = nullas + destination_table_exists = self.table_exists(destination_table_name) if not job_config.write_disposition: - if kwargs["if_exists"] == "append": + if if_exists == "append": job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND - elif kwargs["if_exists"] == "truncate": + elif if_exists == "truncate": job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE - elif kwargs["table_exists"] and kwargs["if_exists"] == "fail": + elif destination_table_exists and if_exists == "fail": raise Exception("Table already exists.") - elif kwargs["if_exists"] == "drop" and kwargs["table_exists"]: - self.delete_table(kwargs["table_name"]) + elif if_exists == "drop" and destination_table_exists: + self.delete_table(destination_table_name) job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY else: job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY - if not job_config.allow_quoted_newlines: - job_config.allow_quoted_newlines = kwargs["allow_quoted_newlines"] + if not job_config.allow_quoted_newlines and allow_quoted_newlines is not None: + job_config.allow_quoted_newlines = allow_quoted_newlines - if kwargs["data_type"] == "csv" and kwargs["allow_jagged_rows"]: - job_config.allow_jagged_rows = kwargs["allow_jagged_rows"] - else: - job_config.allow_jagged_rows = True + if data_type == "csv" and allow_jagged_rows is not None: + job_config.allow_jagged_rows = allow_jagged_rows - if not job_config.quote_character and kwargs["quote"]: - job_config.quote_character = kwargs["quote"] + if not job_config.quote_character and quote is not None: + job_config.quote_character = quote return job_config @@ -1267,6 +1291,42 @@ def _fetch_query_results(self, cursor) -> Table: ptable = petl.frompickle(temp_filename) return Table(ptable) + def _validate_copy_inputs(if_exists: str, data_type: str): + if if_exists not in ["fail", "truncate", "append", "drop"]: + raise ValueError( + f"Unexpected value for if_exists: {if_exists}, must be one of " + '"append", "drop", "truncate", or "fail"' + ) + if data_type not in ["csv", "json"]: + raise ValueError( + f"Only supports csv or json files [data_type = {data_type}]" + ) + + def _load_table_from_uri( + self, + source_uris, + destination, + job_config, + **load_kwargs + ): + try: + load_job = self.client.load_table_from_uri( + source_uris=source_uris, + destination=destination, + job_config=job_config, + **load_kwargs, + ) + load_job.result() + return load_job + except exceptions.BadRequest as e: + for idx, error_ in enumerate(load_job.errors): + if idx == 0: + logger.error('* Load job failed. Enumerating errors collection below:') + logger.error(f"** Error collection - index {idx}:") + logger.error(error_) + + raise e + @staticmethod def _bigquery_type(tp): return BIGQUERY_TYPE_MAP[tp] From 04e4a90f45e3a7a73222f5eb7de0e033a3c5f056 Mon Sep 17 00:00:00 2001 From: Willy Raedy Date: Mon, 19 Feb 2024 15:29:02 -0600 Subject: [PATCH 2/8] fix tests and param renames --- parsons/google/google_bigquery.py | 52 ++++++++++++++++------------ test/test_databases/test_bigquery.py | 27 +++++++++------ 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index a84cbe6e76..ddbbee6b65 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -42,13 +42,6 @@ QUERY_BATCH_SIZE = 100000 -def get_table_ref(client, table_name): - # Helper function to build a TableReference for our table - parsed = parse_table_name(table_name) - dataset_ref = client.dataset(parsed["dataset"]) - return dataset_ref.table(parsed["table"]) - - def parse_table_name(table_name): # Helper function to parse out the different components of a table ID parts = table_name.split(".") @@ -359,7 +352,9 @@ def copy_from_gcs( gcs_blob_uri: str The GoogleCloudStorage URI referencing the file to be copied. table_name: str - The table name to load the data into. + The table name to load the data into. Will be used to generate load schema + if no custom schema or template table are supplied and the if_exists is + set to "truncate" or "append". if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. This maps to `write_disposition` in the @@ -412,11 +407,11 @@ def copy_from_gcs( Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ - self._validate_copy_inputs() + self._validate_copy_inputs(if_exists=if_exists, data_type=data_type) job_config = self._process_job_config( job_config=job_config, - table_name=table_name, + destination_table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, @@ -426,12 +421,12 @@ def copy_from_gcs( allow_quoted_newlines=allow_quoted_newlines, allow_jagged_rows=allow_jagged_rows, quote=quote, - schema=schema, + custom_schema=schema, template_table=template_table ) # load CSV from Cloud Storage into BigQuery - table_ref = get_table_ref(self.client, table_name) + table_ref = self.get_table_ref(table_name=table_name) try: if force_unzip_blobs: @@ -516,7 +511,9 @@ def copy_large_compressed_file_from_gcs( gcs_blob_uri: str The GoogleCloudStorage URI referencing the file to be copied. table_name: str - The table name to load the data into. + The table name to load the data into. Will be used to generate load schema + if no custom schema or template table are supplied and the if_exists is + set to "truncate" or "append". if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. This maps to `write_disposition` in the @@ -574,7 +571,7 @@ def copy_large_compressed_file_from_gcs( job_config = self._process_job_config( job_config=job_config, - table_name=table_name, + destination_table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, @@ -584,7 +581,7 @@ def copy_large_compressed_file_from_gcs( allow_quoted_newlines=allow_quoted_newlines, allow_jagged_rows=allow_jagged_rows, quote=quote, - schema=schema, + custom_schema=schema, template_table=template_table ) @@ -606,7 +603,7 @@ def copy_large_compressed_file_from_gcs( logger.debug( f"Loading uncompressed uri into BigQuery {uncompressed_gcs_uri}..." ) - table_ref = get_table_ref(self.client, table_name) + table_ref = self.get_table_ref(table_name=table_name) return self._load_table_from_uri( source_uris=uncompressed_gcs_uri, destination=table_ref, @@ -742,7 +739,9 @@ def copy( tbl: obj The Parsons Table to copy into BigQuery. table_name: str - The table name to load the data into. + The table name to load the data into. Will be used to generate load schema + if no custom schema or template table are supplied and if_exists is + set to "truncate" or "append". if_exists: str If the table already exists, either ``fail``, ``append``, ``drop`` or ``truncate`` the table. @@ -771,11 +770,12 @@ def copy( job_config = self._process_job_config( job_config=job_config, - table_name=table_name, + destination_table_name=table_name, if_exists=if_exists, max_errors=max_errors, data_type=data_type, - template_table=template_table + template_table=template_table, + parsons_table=tbl ) # Reorder schema to match table to ensure compatibility @@ -800,7 +800,7 @@ def copy( try: self._load_table_from_uri( source_uris=temp_blob_uri, - destination=get_table_ref(self.client, table_name), + destination=self.get_table_ref(table_name=table_name), job_config=job_config, **load_kwargs ) @@ -970,7 +970,7 @@ def delete_table(self, table_name): table_name: str The name of the table to delete. """ - table_ref = get_table_ref(self.client, table_name) + table_ref = self.get_table_ref(table_name=table_name) self.client.delete_table(table_ref) def table_exists(self, table_name: str) -> bool: @@ -984,7 +984,7 @@ def table_exists(self, table_name: str) -> bool: bool True if the table exists in the specified dataset, false otherwise """ - table_ref = get_table_ref(self.client, table_name) + table_ref = self.get_table_ref(table_name=table_name) try: self.client.get_table(table_ref) except exceptions.NotFound: @@ -1113,6 +1113,12 @@ def get_row_count(self, schema: str, table_name: str) -> int: return result["row_count"][0] + def get_table_ref(self, table_name): + # Helper function to build a TableReference for our table + parsed = parse_table_name(table_name) + dataset_ref = self.client.dataset(parsed["dataset"]) + return dataset_ref.table(parsed["table"]) + def _get_job_config_schema( self, job_config: LoadJobConfig, @@ -1291,7 +1297,7 @@ def _fetch_query_results(self, cursor) -> Table: ptable = petl.frompickle(temp_filename) return Table(ptable) - def _validate_copy_inputs(if_exists: str, data_type: str): + def _validate_copy_inputs(self, if_exists: str, data_type: str): if if_exists not in ["fail", "truncate", "append", "drop"]: raise ValueError( f"Unexpected value for if_exists: {if_exists}, must be one of " diff --git a/test/test_databases/test_bigquery.py b/test/test_databases/test_bigquery.py index 3522d582a7..3b991d2c86 100644 --- a/test/test_databases/test_bigquery.py +++ b/test/test_databases/test_bigquery.py @@ -338,8 +338,9 @@ def test_copy(self): gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri) tbl = self.default_table bq = self._build_mock_client_for_copying(table_exists=False) - bq.copy_from_gcs = mock.MagicMock() - table_name = ("dataset.table",) + bq._load_table_from_uri = mock.MagicMock() + bq.get_table_ref = mock.Mock(wraps=bq.get_table_ref) + table_name = "dataset.table" # call the method being tested bq.copy( @@ -356,13 +357,16 @@ def test_copy(self): self.assertEqual(upload_call_args[0][1], self.tmp_gcs_bucket) tmp_blob_name = upload_call_args[0][2] - self.assertEqual(bq.copy_from_gcs.call_count, 1) - load_call_args = bq.copy_from_gcs.call_args - job_config = bq.copy_from_gcs.call_args[1]["job_config"] + self.assertEqual(bq._load_table_from_uri.call_count, 1) + load_call_args = bq._load_table_from_uri.call_args + job_config = load_call_args[1]["job_config"] column_types = [schema_field.field_type for schema_field in job_config.schema] self.assertEqual(column_types, ["INTEGER", "STRING", "BOOLEAN"]) - self.assertEqual(load_call_args[1]["gcs_blob_uri"], tmp_blob_uri) - self.assertEqual(load_call_args[1]["table_name"], table_name) + self.assertEqual(load_call_args[1]["source_uris"], tmp_blob_uri) + + self.assertEqual(bq.get_table_ref.call_count, 2) + get_table_ref_args = bq.get_table_ref.call_args + self.assertEqual(get_table_ref_args[1]["table_name"], table_name) # make sure we cleaned up the temp file self.assertEqual(gcs_client.delete_blob.call_count, 1) @@ -398,7 +402,8 @@ def test_copy__if_exists_passed_through(self): gcs_client = self._build_mock_cloud_storage_client(tmp_blob_uri) tbl = self.default_table bq = self._build_mock_client_for_copying(table_exists=False) - bq.copy_from_gcs = mock.MagicMock() + bq._load_table_from_uri = mock.MagicMock() + bq._process_job_config = mock.Mock(wraps=bq._process_job_config) table_name = "dataset.table" if_exists = "drop" @@ -411,9 +416,9 @@ def test_copy__if_exists_passed_through(self): if_exists=if_exists, ) - self.assertEqual(bq.copy_from_gcs.call_count, 1) - load_call_args = bq.copy_from_gcs.call_args - self.assertEqual(load_call_args[1]["if_exists"], if_exists) + self.assertEqual(bq._load_table_from_uri.call_count, 1) + process_job_config_args = bq._process_job_config.call_args + self.assertEqual(process_job_config_args[1]["if_exists"], if_exists) @mock.patch.object(BigQuery, "table_exists", return_value=False) @mock.patch.object(BigQuery, "query", return_value=None) From 27a7b5657d77bf2f6f7430468088057a75e8721d Mon Sep 17 00:00:00 2001 From: Willy Raedy Date: Mon, 19 Feb 2024 16:09:19 -0600 Subject: [PATCH 3/8] fix upsert query --- parsons/google/google_bigquery.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index ddbbee6b65..adde52b28a 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -934,33 +934,32 @@ def upsert( **copy_args, ) - staging_table_name = staging_tbl.split(".")[1] - target_table_name = target_table.split(".")[1] - # Delete rows comparisons = [ - f"{staging_table_name}.{primary_key} = {target_table_name}.{primary_key}" + f"`{staging_tbl}`.{primary_key} = `{target_table}`.{primary_key}" for primary_key in primary_keys ] where_clause = " and ".join(comparisons) queries = [ f""" - DELETE FROM {target_table} - USING {staging_tbl} - WHERE {where_clause} + DELETE FROM `{target_table}` + WHERE EXISTS + (SELECT * FROM `{staging_tbl}` + WHERE {where_clause}) """, f""" - INSERT INTO {target_table} - SELECT * FROM {staging_tbl} + INSERT INTO `{target_table}` + SELECT * FROM `{staging_tbl}` """, ] - if cleanup_temp_table: - # Drop the staging table - queries.append(f"DROP TABLE IF EXISTS {staging_tbl}") - - return self.query_with_transaction(queries=queries) + try: + return self.query_with_transaction(queries=queries) + finally: + if cleanup_temp_table: + logger.info(f'Deleting staging table: {staging_tbl}') + self.query(f"DROP TABLE IF EXISTS {staging_tbl}", return_values=False) def delete_table(self, table_name): """ @@ -1191,7 +1190,7 @@ def _process_job_config( max_errors: int, data_type: str, csv_delimiter: Optional[str] = ',', - ignoreheader: Optional[int] = 0, + ignoreheader: Optional[int] = 1, nullas: Optional[str] = None, allow_quoted_newlines: Optional[bool] = None, allow_jagged_rows: Optional[bool] = None, From 8d97a4a19a5083a4cb5fc96dd0ccad1dc1be7f8b Mon Sep 17 00:00:00 2001 From: Willy Raedy Date: Mon, 19 Feb 2024 16:35:01 -0600 Subject: [PATCH 4/8] formatting --- parsons/google/google_bigquery.py | 8 +++++--- test/test_databases/test_bigquery.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index adde52b28a..bd688125a9 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -1117,7 +1117,7 @@ def get_table_ref(self, table_name): parsed = parse_table_name(table_name) dataset_ref = self.client.dataset(parsed["dataset"]) return dataset_ref.table(parsed["table"]) - + def _get_job_config_schema( self, job_config: LoadJobConfig, @@ -1143,7 +1143,9 @@ def _get_job_config_schema( bigquery_table = self.client.get_table(template_table) return bigquery_table.schema except google.api_core.exceptions.NotFound: - logger.warning(f"template_table '{template_table}' not found. Unablet to set schema.") + logger.warning( + f"template_table '{template_table}' not found. Unable to set schema." + ) # if load is coming from a Parsons table, use that to generate schema if parsons_table: return self._generate_schema_from_parsons_table(parsons_table) @@ -1331,7 +1333,7 @@ def _load_table_from_uri( logger.error(error_) raise e - + @staticmethod def _bigquery_type(tp): return BIGQUERY_TYPE_MAP[tp] diff --git a/test/test_databases/test_bigquery.py b/test/test_databases/test_bigquery.py index 3b991d2c86..353ea4115f 100644 --- a/test/test_databases/test_bigquery.py +++ b/test/test_databases/test_bigquery.py @@ -363,7 +363,7 @@ def test_copy(self): column_types = [schema_field.field_type for schema_field in job_config.schema] self.assertEqual(column_types, ["INTEGER", "STRING", "BOOLEAN"]) self.assertEqual(load_call_args[1]["source_uris"], tmp_blob_uri) - + self.assertEqual(bq.get_table_ref.call_count, 2) get_table_ref_args = bq.get_table_ref.call_args self.assertEqual(get_table_ref_args[1]["table_name"], table_name) From 52435af16ffd63a1f3215aebce7c2962eb6b9dae Mon Sep 17 00:00:00 2001 From: Willy Raedy Date: Mon, 19 Feb 2024 16:47:44 -0600 Subject: [PATCH 5/8] black fix --- parsons/google/google_bigquery.py | 37 +++++++++++++------------------ 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index bd688125a9..c76da2b1b6 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -422,7 +422,7 @@ def copy_from_gcs( allow_jagged_rows=allow_jagged_rows, quote=quote, custom_schema=schema, - template_table=template_table + template_table=template_table, ) # load CSV from Cloud Storage into BigQuery @@ -564,10 +564,7 @@ def copy_large_compressed_file_from_gcs( client. """ - self._validate_copy_inputs( - if_exists=if_exists, - data_type=data_type - ) + self._validate_copy_inputs(if_exists=if_exists, data_type=data_type) job_config = self._process_job_config( job_config=job_config, @@ -582,7 +579,7 @@ def copy_large_compressed_file_from_gcs( allow_jagged_rows=allow_jagged_rows, quote=quote, custom_schema=schema, - template_table=template_table + template_table=template_table, ) # TODO - See if this inheritance is happening in other places @@ -763,7 +760,7 @@ def copy( Arguments to pass to the underlying load_table_from_uri call on the BigQuery client. """ - data_type = 'csv' + data_type = "csv" tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket) self._validate_copy_inputs(if_exists=if_exists, data_type=data_type) @@ -775,7 +772,7 @@ def copy( max_errors=max_errors, data_type=data_type, template_table=template_table, - parsons_table=tbl + parsons_table=tbl, ) # Reorder schema to match table to ensure compatibility @@ -802,7 +799,7 @@ def copy( source_uris=temp_blob_uri, destination=self.get_table_ref(table_name=table_name), job_config=job_config, - **load_kwargs + **load_kwargs, ) finally: gcs_client.delete_blob(tmp_gcs_bucket, temp_blob_name) @@ -958,7 +955,7 @@ def upsert( return self.query_with_transaction(queries=queries) finally: if cleanup_temp_table: - logger.info(f'Deleting staging table: {staging_tbl}') + logger.info(f"Deleting staging table: {staging_tbl}") self.query(f"DROP TABLE IF EXISTS {staging_tbl}", return_values=False) def delete_table(self, table_name): @@ -1125,7 +1122,7 @@ def _get_job_config_schema( if_exists: str, parsons_table: Optional[Table] = None, custom_schema: Optional[list] = None, - template_table: Optional[str] = None + template_table: Optional[str] = None, ) -> LoadJobConfig: # if job.schema already set in job_config, do nothing if job_config.schema: @@ -1191,7 +1188,7 @@ def _process_job_config( if_exists: str, max_errors: int, data_type: str, - csv_delimiter: Optional[str] = ',', + csv_delimiter: Optional[str] = ",", ignoreheader: Optional[int] = 1, nullas: Optional[str] = None, allow_quoted_newlines: Optional[bool] = None, @@ -1200,7 +1197,7 @@ def _process_job_config( job_config: Optional[LoadJobConfig] = None, custom_schema: Optional[list] = None, template_table: Optional[str] = None, - parsons_table: Optional[Table] = None + parsons_table: Optional[Table] = None, ) -> LoadJobConfig: """ Internal function to neatly process a user-supplied job configuration object. @@ -1224,7 +1221,7 @@ def _process_job_config( if_exists=if_exists, parsons_table=parsons_table, custom_schema=custom_schema, - template_table=template_table + template_table=template_table, ) if not job_config.schema: job_config.autodetect = True @@ -1309,13 +1306,7 @@ def _validate_copy_inputs(self, if_exists: str, data_type: str): f"Only supports csv or json files [data_type = {data_type}]" ) - def _load_table_from_uri( - self, - source_uris, - destination, - job_config, - **load_kwargs - ): + def _load_table_from_uri(self, source_uris, destination, job_config, **load_kwargs): try: load_job = self.client.load_table_from_uri( source_uris=source_uris, @@ -1328,7 +1319,9 @@ def _load_table_from_uri( except exceptions.BadRequest as e: for idx, error_ in enumerate(load_job.errors): if idx == 0: - logger.error('* Load job failed. Enumerating errors collection below:') + logger.error( + "* Load job failed. Enumerating errors collection below:" + ) logger.error(f"** Error collection - index {idx}:") logger.error(error_) From 6a24469fb82badeefcb8a6d8036e0d05d2ebcdf9 Mon Sep 17 00:00:00 2001 From: Willy Raedy Date: Wed, 21 Feb 2024 10:57:37 -0600 Subject: [PATCH 6/8] fix fn signature --- parsons/google/google_bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index c76da2b1b6..7f96a5e002 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -1123,7 +1123,7 @@ def _get_job_config_schema( parsons_table: Optional[Table] = None, custom_schema: Optional[list] = None, template_table: Optional[str] = None, - ) -> LoadJobConfig: + ) -> Optional[List[bigquery.SchemaField]]: # if job.schema already set in job_config, do nothing if job_config.schema: return job_config.schema From c2e30d3399dffcaa1bf7b70409c8bcd3688d6597 Mon Sep 17 00:00:00 2001 From: Kasia Hinkson <52927664+KasiaHinkson@users.noreply.github.com> Date: Tue, 27 Feb 2024 09:48:18 -0600 Subject: [PATCH 7/8] Add job config kwargs to copy --- parsons/google/google_bigquery.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index 7f96a5e002..b410198b59 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -727,6 +727,12 @@ def copy( gcs_client: Optional[GoogleCloudStorage] = None, job_config: Optional[LoadJobConfig] = None, template_table: Optional[str] = None, + ignoreheader: int = 1, + nullas: Optional[str] = None, + allow_quoted_newlines: bool = True, + allow_jagged_rows: bool = True, + quote: Optional[str] = None, + schema: Optional[List[dict]] = None, **load_kwargs, ): """ @@ -773,6 +779,12 @@ def copy( data_type=data_type, template_table=template_table, parsons_table=tbl, + ignoreheader=ignoreheader, + nullas=nullas, + allow_quoted_newlines=allow_quoted_newlines, + allow_jagged_rows=allow_jagged_rows, + quote=quote, + custom_schema=schema, ) # Reorder schema to match table to ensure compatibility From 3c1cd7162da744d6cb245f38d3251ff193a7a611 Mon Sep 17 00:00:00 2001 From: Kasia Hinkson <52927664+KasiaHinkson@users.noreply.github.com> Date: Tue, 27 Feb 2024 11:25:15 -0600 Subject: [PATCH 8/8] template table optional --- parsons/google/google_bigquery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index b410198b59..e341db31a8 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -342,7 +342,7 @@ def copy_from_gcs( force_unzip_blobs: bool = False, compression_type: str = "gzip", new_file_extension: str = "csv", - template_table: str = None, + template_table: Optional[str] = None, **load_kwargs, ): """ @@ -500,7 +500,7 @@ def copy_large_compressed_file_from_gcs( job_config: Optional[LoadJobConfig] = None, compression_type: str = "gzip", new_file_extension: str = "csv", - template_table: str = None, + template_table: Optional[str] = None, **load_kwargs, ): """