Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement gcs_schema_object for BigQueryCreateExternalTableOperator #30961

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context


BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}"


Expand Down Expand Up @@ -1434,6 +1433,8 @@ class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
If provided all other parameters are ignored. External schema from object will be resolved.
:param schema_object: If set, a GCS object path pointing to a .json file that
contains the schema for the table. (templated)
:param gcs_schema_bucket: GCS bucket name where the schema JSON is stored (templated).
The default value is self.bucket.
:param source_format: File format of the data.
:param autodetect: Try to detect schema and format options automatically.
The schema_fields and schema_object options will be honored when specified explicitly.
Expand Down Expand Up @@ -1481,6 +1482,7 @@ class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
"bucket",
"source_objects",
"schema_object",
"gcs_schema_bucket",
"destination_project_dataset_table",
"labels",
"table_resource",
Expand All @@ -1499,6 +1501,7 @@ def __init__(
table_resource: dict[str, Any] | None = None,
schema_fields: list | None = None,
schema_object: str | None = None,
gcs_schema_bucket: str | None = None,
source_format: str | None = None,
autodetect: bool = False,
compression: str | None = None,
Expand Down Expand Up @@ -1557,6 +1560,8 @@ def __init__(
)
if not bucket:
raise ValueError("`bucket` is required when not using `table_resource`.")
if not gcs_schema_bucket:
gcs_schema_bucket = bucket
if not source_objects:
raise ValueError("`source_objects` is required when not using `table_resource`.")
if not source_format:
Expand All @@ -1574,6 +1579,7 @@ def __init__(
self.bucket = bucket
self.source_objects = source_objects
self.schema_object = schema_object
self.gcs_schema_bucket = gcs_schema_bucket
self.destination_project_dataset_table = destination_project_dataset_table
self.schema_fields = schema_fields
self.source_format = source_format
Expand All @@ -1586,6 +1592,7 @@ def __init__(
self.bucket = ""
self.source_objects = []
self.schema_object = None
self.gcs_schema_bucket = ""
self.destination_project_dataset_table = ""

if table_resource and kwargs_passed:
Expand Down Expand Up @@ -1629,7 +1636,9 @@ def execute(self, context: Context) -> None:
gcp_conn_id=self.google_cloud_storage_conn_id,
impersonation_chain=self.impersonation_chain,
)
schema_fields = json.loads(gcs_hook.download(self.bucket, self.schema_object).decode("utf-8"))
schema_fields = json.loads(
gcs_hook.download(self.gcs_schema_bucket, self.schema_object).decode("utf-8")
)
else:
schema_fields = self.schema_fields

Expand Down
1 change: 1 addition & 0 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def test_execute(self, mock_hook):
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
schema_fields=[],
bucket=TEST_GCS_BUCKET,
gcs_schema_bucket=TEST_GCS_BUCKET,
source_objects=TEST_GCS_DATA,
source_format=TEST_SOURCE_FORMAT,
autodetect=True,
Expand Down