Skip to content

Commit

Permalink
Add retry factory to consolidate retry strategies across dbt-bigquery (
Browse files Browse the repository at this point in the history
…#1395)

* fix imports
* create a retry factory and move relevant objects from connections
* add on_error method for deadline retries
* remove dependency on retry_and_handle from cancel_open
* remove dependencies on retry_and_handle
* remove timeout methods from connection manager
* add retry to get_bq_table
* move client factory to credentials module so that on_error can be moved to the retry factory in the retry module
* move on_error factory to retry module
* move client factories from python_submissions module to credentials module
* create a clients module
* retry all client factories by default
* move polling from manual check in python_submissions module into retry_factory
* move load_dataframe logic from adapter to connection manager, use the built-in timeout argument instead of a manual polling method
* move upload_file logic from adapter to connection manager, use the built-in timeout argument instead of a manual polling method, remove the manual polling method
* move the retry to polling for done instead of create
* align new retries with original methods, simplify retry factory
* create a method for the dataproc endpoint
* make imports explicit, remove unused constant
* update names in clients.py to follow the naming convention
* update names in connections.py to follow the naming convention
* update names in credentials.py to follow the naming convention
* update names in python_submissions.py to follow the naming convention
* update names in retry.py to follow the naming convention

---------

Co-authored-by: Colin Rogers <111200756+colin-rogers-dbt@users.noreply.github.com>
  • Loading branch information
mikealfare and colin-rogers-dbt authored Nov 20, 2024
1 parent 75142ac commit 83bb413
Show file tree
Hide file tree
Showing 15 changed files with 713 additions and 715 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20241107-143856.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Create a retry factory to simplify retry strategies across dbt-bigquery
time: 2024-11-07T14:38:56.210445-05:00
custom:
Author: mikealfare osalama
Issue: "1395"
69 changes: 69 additions & 0 deletions dbt/adapters/bigquery/clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from google.api_core.client_info import ClientInfo
from google.api_core.client_options import ClientOptions
from google.api_core.retry import Retry
from google.auth.exceptions import DefaultCredentialsError
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.dataproc_v1 import BatchControllerClient, JobControllerClient
from google.cloud.storage import Client as StorageClient

from dbt.adapters.events.logging import AdapterLogger

import dbt.adapters.bigquery.__version__ as dbt_version
from dbt.adapters.bigquery.credentials import (
BigQueryCredentials,
create_google_credentials,
set_default_credentials,
)


_logger = AdapterLogger("BigQuery")


def create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
try:
return _create_bigquery_client(credentials)
except DefaultCredentialsError:
_logger.info("Please log into GCP to continue")
set_default_credentials()
return _create_bigquery_client(credentials)


@Retry() # google decorator. retries on transient errors with exponential backoff
def create_gcs_client(credentials: BigQueryCredentials) -> StorageClient:
return StorageClient(
project=credentials.execution_project,
credentials=create_google_credentials(credentials),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
def create_dataproc_job_controller_client(credentials: BigQueryCredentials) -> JobControllerClient:
return JobControllerClient(
credentials=create_google_credentials(credentials),
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
def create_dataproc_batch_controller_client(
credentials: BigQueryCredentials,
) -> BatchControllerClient:
return BatchControllerClient(
credentials=create_google_credentials(credentials),
client_options=ClientOptions(api_endpoint=_dataproc_endpoint(credentials)),
)


@Retry() # google decorator. retries on transient errors with exponential backoff
def _create_bigquery_client(credentials: BigQueryCredentials) -> BigQueryClient:
return BigQueryClient(
credentials.execution_project,
create_google_credentials(credentials),
location=getattr(credentials, "location", None),
client_info=ClientInfo(user_agent=f"dbt-bigquery-{dbt_version.version}"),
client_options=ClientOptions(quota_project_id=credentials.quota_project),
)


def _dataproc_endpoint(credentials: BigQueryCredentials) -> str:
return f"{credentials.dataproc_region}-dataproc.googleapis.com:443"
Loading

0 comments on commit 83bb413

Please sign in to comment.