From 114a7d133f699988b2450ac70d0e7d6b07e05620 Mon Sep 17 00:00:00 2001 From: Mike Moore Date: Mon, 1 Jan 2024 11:03:16 +0000 Subject: [PATCH 1/3] feat: handle scalar sql functions in bqsync that reference only dataset level functions that fail via api but work via DDL statements --- src/bqtools/__init__.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/bqtools/__init__.py b/src/bqtools/__init__.py index 9603230..c129d0b 100644 --- a/src/bqtools/__init__.py +++ b/src/bqtools/__init__.py @@ -5822,6 +5822,33 @@ def create_destination_routine(copy_driver, routine_name, routine_input): copy_driver.get_logger().exception( f"Unable to create routine {routine_name} in {copy_driver.destination_project}.{copy_driver.destination_dataset} definition {routine_input['routine_definition']}" ) + if dstroutine_ref.type_ == "SCALAR_FUNCTION" and dstroutine_ref.language == "SQL": + copy_driver.get_logger().info(f"As scalar function and SQL attempting adding as query") + function_as_query = f"""CREATE OR REPLACE FUNCTION `{dstroutine_ref.project}.{dstroutine_ref.dataset_id}.{dstroutine_ref.routine_id}` ({",".join([arg.name + " " + arg.data_type.type_kind for arg in dstroutine_ref.arguments])}) AS +({dstroutine_ref.body}) +{ "RETURNS " + dstroutine_ref.return_type.type_kind if dstroutine_ref.return_type else ""} +OPTIONS (description="{dstroutine_ref.description if dstroutine_ref.description else ""}")""" + try: + for result in run_query( + copy_driver.query_client, + function_as_query, + copy_driver.get_logger(), + "Apply SQL scalar function", + location=copy_driver.destination_location, + callback_on_complete=copy_driver.update_job_stats, + labels=BQSYNCQUERYLABELS, + # ddl statements cannot use CMEK + query_cmek=None, + ): + pass + copy_driver.get_logger().info(f"Running as query did work function {routine_name} in {copy_driver.destination_project}.{copy_driver.destination_dataset} created") + except Exception: + copy_driver.increment_routines_failed_sync() + copy_driver.get_logger().exception( + f"Unable to create routine using query {routine_name} in {copy_driver.destination_project}.{copy_driver.destination_dataset} definition {routine_input['routine_definition']}" + ) + else: + copy_driver.increment_routines_failed_sync() return return dstroutine else: From 01e6c3e655c4a446173c35aecafe572d7d15712e Mon Sep 17 00:00:00 2001 From: Mike Moore Date: Mon, 1 Jan 2024 11:05:30 +0000 Subject: [PATCH 2/3] feat: handle scalar sql functions in bqsync that reference only dataset level functions that fail via api but work via DDL statements --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd9459b..8e17439 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ - Updates and improve testing and add tox actions on pull requests - Update dependencies (dependabot) absl-py and bigquery - Enhance head views to include streaming buffer +- For scalar SQL routines when syncing if fail with Bad Request attempt to use DDL instead of API to apply 1.0.5 April 10th 2023 - Adjust jinja dependency so will allow Jina2 version3 From fd7dbb65a107bc3e6348446c22e3085d9183148f Mon Sep 17 00:00:00 2001 From: Mike Moore Date: Mon, 1 Jan 2024 11:11:55 +0000 Subject: [PATCH 3/3] feat: handle scalar sql functions in bqsync that reference only dataset level functions that fail via api but work via DDL statements --- src/bqtools/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bqtools/__init__.py b/src/bqtools/__init__.py index c129d0b..f7adac2 100644 --- a/src/bqtools/__init__.py +++ b/src/bqtools/__init__.py @@ -5823,10 +5823,10 @@ def create_destination_routine(copy_driver, routine_name, routine_input): f"Unable to create routine {routine_name} in {copy_driver.destination_project}.{copy_driver.destination_dataset} definition {routine_input['routine_definition']}" ) if dstroutine_ref.type_ == "SCALAR_FUNCTION" and dstroutine_ref.language == "SQL": - copy_driver.get_logger().info(f"As scalar function and SQL attempting adding as query") + copy_driver.get_logger().info("As scalar function and SQL attempting adding as query") function_as_query = f"""CREATE OR REPLACE FUNCTION `{dstroutine_ref.project}.{dstroutine_ref.dataset_id}.{dstroutine_ref.routine_id}` ({",".join([arg.name + " " + arg.data_type.type_kind for arg in dstroutine_ref.arguments])}) AS ({dstroutine_ref.body}) -{ "RETURNS " + dstroutine_ref.return_type.type_kind if dstroutine_ref.return_type else ""} +{"RETURNS " + dstroutine_ref.return_type.type_kind if dstroutine_ref.return_type else ""} OPTIONS (description="{dstroutine_ref.description if dstroutine_ref.description else ""}")""" try: for result in run_query( @@ -5839,7 +5839,7 @@ def create_destination_routine(copy_driver, routine_name, routine_input): labels=BQSYNCQUERYLABELS, # ddl statements cannot use CMEK query_cmek=None, - ): + ): pass copy_driver.get_logger().info(f"Running as query did work function {routine_name} in {copy_driver.destination_project}.{copy_driver.destination_dataset} created") except Exception: