Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternate approach to get location for Bigquery tables #1449

Merged
merged 7 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions python-sdk/src/astro/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,13 @@ def default_metadata(self) -> Metadata:
def populate_table_metadata(self, table: BaseTable) -> BaseTable:
"""
Given a table, check if the table has metadata.
If the metadata is missing, and the database has metadata, assign it to the table.
If the metadata is missing and source_table is defined, copy the attrs from that.
If the source_table is missing, and the database has metadata, assign it to the table.
If the table schema was not defined by the end, retrieve the user-defined schema.
This method performs the changes in-place and also returns the table.

:param table: Table to potentially have their metadata changed
:param source_table: Table to copy metadata from
kaxil marked this conversation as resolved.
Show resolved Hide resolved
:return table: Return the modified table
"""
if table.metadata and table.metadata.is_empty() and self.default_metadata:
Expand Down Expand Up @@ -362,7 +364,7 @@ def create_schema_and_table_if_needed(
is_file_pattern_based_schema_autodetection_supported = (
self.check_file_pattern_based_schema_autodetection_is_supported(source_file=file)
)
if self.schema_exists(table.metadata.schema) and if_exists == "replace":
if if_exists == "replace":
kaxil marked this conversation as resolved.
Show resolved Hide resolved
self.drop_table(table)
if use_native_support and is_schema_autodetection_supported and not file.is_pattern():
return
Expand Down Expand Up @@ -698,9 +700,7 @@ def export_table_to_file(
# Schema Management
# ---------------------------------------------------------

def create_schema_if_needed(
self, schema: str | None, location: str | None = None # skipcq: PYL-W0613
) -> None:
def create_schema_if_needed(self, schema: str | None) -> None:
kaxil marked this conversation as resolved.
Show resolved Hide resolved
"""
This function checks if the expected schema exists in the database. If the schema does not exist,
it will attempt to create it.
Expand All @@ -721,20 +721,6 @@ def schema_exists(self, schema: str) -> bool:
"""
raise NotImplementedError

def get_schema_region(self, schema: str | None = None) -> str: # skipcq: PYL-W0613, PYL-R0201
"""
Get region where the schema is created
:param schema: namespace
:return:
"""
return ""

def check_same_region(self, table: BaseTable, other_table: BaseTable): # skipcq: PYL-W0613, PYL-R0201
"""
Check if two tables are from the same database region
"""
return True

# ---------------------------------------------------------
# Context & Template Rendering methods (Transformations)
# ---------------------------------------------------------
Expand Down
50 changes: 37 additions & 13 deletions python-sdk/src/astro/databases/google/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,36 @@ def default_metadata(self) -> Metadata:
return Metadata(
schema=self.DEFAULT_SCHEMA,
database=self.hook.project_id,
region=BIGQUERY_SCHEMA_LOCATION,
) # type: ignore

def populate_table_metadata(self, table: BaseTable) -> BaseTable:
if (
table.temp
and (self.table and not self.table.metadata.is_empty())
and (table.metadata and table.metadata.is_empty())
):
kaxil marked this conversation as resolved.
Show resolved Hide resolved
return self._populate_temp_table_metadata_from_input_table(table)
if table.metadata and table.metadata.is_empty() and self.default_metadata:
table.metadata = self.default_metadata
if not table.metadata.schema:
table.metadata.schema = self.DEFAULT_SCHEMA
return table

def _populate_temp_table_metadata_from_input_table(self, temp_table: BaseTable) -> BaseTable:
if not self.table:
return temp_table

source_location = self._get_schema_location(self.table.metadata.schema)
default_schema_location = self._get_schema_location(self.DEFAULT_SCHEMA)

if source_location == default_schema_location:
schema = self.DEFAULT_SCHEMA
else:
schema = f"{self.DEFAULT_SCHEMA}__{source_location.replace('-', '_')}"
source_db = self.table.metadata.database or self.hook.project_id
temp_table.metadata = Metadata(schema=schema, database=source_db)
return temp_table

kaxil marked this conversation as resolved.
Show resolved Hide resolved
def schema_exists(self, schema: str) -> bool:
"""
Checks if a dataset exists in the BigQuery
Expand All @@ -149,11 +176,11 @@ def schema_exists(self, schema: str) -> bool:
return False
return True

def get_schema_region(self, schema: str | None = None) -> str:
def _get_schema_location(self, schema: str | None = None) -> str:
"""
Get region where the schema is created

:param schema: Bigquery namespace
:return:
"""
if schema is None:
return ""
Expand All @@ -163,14 +190,6 @@ def get_schema_region(self, schema: str | None = None) -> str:
except GoogleNotFound:
return ""

def check_same_region(self, table: BaseTable, other_table: BaseTable):
"""
Check if two tables are from the same database region
"""
table_location = self.get_schema_region(schema=table.metadata.schema)
other_table_location = self.get_schema_region(schema=other_table.metadata.schema)
return table_location == other_table_location

@staticmethod
def get_merge_initialization_query(parameters: tuple) -> str:
"""
Expand Down Expand Up @@ -211,7 +230,7 @@ def load_pandas_dataframe_to_table(
credentials=creds,
)

def create_schema_if_needed(self, schema: str | None, location: str | None = None) -> None:
def create_schema_if_needed(self, schema: str | None) -> None:
"""
This function checks if the expected schema exists in the database. If the schema does not exist,
it will attempt to create it.
Expand All @@ -220,8 +239,12 @@ def create_schema_if_needed(self, schema: str | None, location: str | None = Non
"""
# We check if the schema exists first because BigQuery will fail on a create schema query even if it
# doesn't actually create a schema.
location = location or BIGQUERY_SCHEMA_LOCATION
if schema and not self.schema_exists(schema):

input_table_schema = self.table.metadata.schema if self.table and self.table.metadata else None
input_table_location = self._get_schema_location(input_table_schema)

location = input_table_location or BIGQUERY_SCHEMA_LOCATION
statement = self._create_schema_statement.format(schema, location)
self.run_sql(statement)

Expand Down Expand Up @@ -376,6 +399,7 @@ def load_gs_file_to_table(
if self.is_native_autodetect_schema_available(file=source_file):
load_job_config["autodetect"] = True # type: ignore

# TODO: Fix this -- it should be load_job_config.update(native_support_kwargs)
native_support_kwargs.update(native_support_kwargs)

# Since bigquery has other options besides used here, we need to expose them to end user.
Expand Down
2 changes: 1 addition & 1 deletion python-sdk/src/astro/databases/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def populate_table_metadata(self, table: BaseTable) -> BaseTable:
table.conn_id = table.conn_id or self.conn_id
return table

def create_schema_if_needed(self, schema: str | None, location: str | None = None) -> None:
def create_schema_if_needed(self, schema: str | None) -> None:
"""
Since SQLite does not have schemas, we do not need to set a schema here.
"""
Expand Down
4 changes: 1 addition & 3 deletions python-sdk/src/astro/databricks/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ def schema_exists(self, schema: str) -> bool:
# Schemas do not need to be created for delta, so we can assume this is true
return True

def create_schema_if_needed(
self, schema: str | None, location: str | None = None # skipcq: PYL-W0613
) -> None:
def create_schema_if_needed(self, schema: str | None) -> None: # skipcq: PYL-W0613
# Schemas do not need to be created for delta, so we don't need to do anything here
return None

Expand Down
23 changes: 1 addition & 22 deletions python-sdk/src/astro/sql/operators/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from sqlalchemy.sql.functions import Function

from astro.sql.operators.base_decorator import BaseSQLDecoratedOperator
from astro.utils.table import find_first_table
from astro.utils.typing_compat import Context


Expand Down Expand Up @@ -52,28 +51,8 @@ def __init__(
)

def execute(self, context: Context):
first_table = find_first_table(
op_args=self.op_args, # type: ignore
op_kwargs=self.op_kwargs,
python_callable=self.python_callable,
parameters=self.parameters, # type: ignore
context=context,
)

super().execute(context)

if (
first_table
and self.output_table.temp
and (not self.database_impl.check_same_region(table=first_table, other_table=self.output_table))
):
self.output_table.metadata.region = self.database_impl.get_schema_region(
schema=first_table.metadata.schema
)

self.database_impl.create_schema_if_needed(
self.output_table.metadata.schema, self.output_table.metadata.region
)
self.database_impl.create_schema_if_needed(self.output_table.metadata.schema)
self.database_impl.drop_table(self.output_table)
self.database_impl.create_table_from_select_statement(
statement=self.sql,
Expand Down
26 changes: 4 additions & 22 deletions python-sdk/src/astro/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@
TEMP_PREFIX = "_tmp"


def metadata_field_converter(val):
if isinstance(val, dict):
if "_schema" in val:
val["schema"] = val.pop("_schema")
return Metadata(**val)
return val


@define
class Metadata:
"""
Expand All @@ -34,25 +26,15 @@ class Metadata:
"""

# This property is used by several databases, including: Postgres, Snowflake and BigQuery ("namespace")
_schema: str | None = None
schema: str | None = None
database: str | None = None
region: str | None = None

# TODO(kaxil) - add deprecation for region param in __init__

def is_empty(self) -> bool:
"""Check if all the fields are None."""
return all(getattr(self, field_name) is None for field_name in fields_dict(self.__class__))

@property
def schema(self):
if self.region:
# We are replacing the `-` with `_` because for bigquery doesn't allow `-` in schema name
return f"{self._schema}__{self.region.replace('-', '_')}"
return self._schema

@schema.setter
def schema(self, value):
self._schema = value


@define(slots=False)
class BaseTable:
Expand All @@ -79,7 +61,7 @@ class BaseTable:
# Setting converter allows passing a dictionary to metadata arg
metadata: Metadata = field(
factory=Metadata,
converter=metadata_field_converter,
converter=lambda val: Metadata(**val) if isinstance(val, dict) else val,
)
columns: list[Column] = field(factory=list)
temp: bool = field(default=False)
Expand Down
22 changes: 2 additions & 20 deletions python-sdk/tests_integration/databases/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,25 +467,7 @@ def test_export_table_to_file_file_already_exists_raises_exception(
assert err_msg.endswith(f"The file {filepath} already exists.")


def test_get_schema_region():
"""
Test get_schema_region() function
:return:
"""
def test_get_schema_location():
db = BigqueryDatabase(conn_id="gcp_conn")
location = db.get_schema_region("tmp_astro")
location = db._get_schema_location("tmp_astro")
assert location == "US"


def test_check_same_region():
"""
Test check_same_region() function
:return:
"""
db = BigqueryDatabase(conn_id="gcp_conn")
tableA = Table(conn_id=db.conn_id, metadata=db.default_metadata)
tableB = Table(conn_id=db.conn_id, metadata=db.default_metadata)
assert db.check_same_region(table=tableA, other_table=tableB)

tableA.metadata.schema = "testing_region"
assert not db.check_same_region(table=tableA, other_table=tableB)