Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for snowflake dynamic tables to SqlAlchemy Core #531

Merged
merged 11 commits into from
Oct 2, 2024
5 changes: 4 additions & 1 deletion DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ Source code is also available at:

# Release Notes

- 1.6.2
- (Unreleased)

- Add support for dynamic tables and required options
- Fixed SAWarning when registering functions with existing name in default namespace
- Fixed SAWarning when registering functions with existing name in default namespace

- v1.6.1(July 9, 2024)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ development = [
"pytz",
"numpy",
"mock",
"syrupy==4.6.1",
]
pandas = ["snowflake-connector-python[pandas]"]

Expand Down
7 changes: 7 additions & 0 deletions src/snowflake/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
VARBINARY,
VARIANT,
)
from .sql.custom_schema import DynamicTable
from .sql.custom_schema.options import AsQuery, TargetLag, TimeUnit, Warehouse
from .util import _url as URL

base.dialect = dialect = snowdialect.dialect
Expand Down Expand Up @@ -113,4 +115,9 @@
"ExternalStage",
"CreateStage",
"CreateFileFormat",
"DynamicTable",
"AsQuery",
"TargetLag",
"TimeUnit",
"Warehouse",
)
1 change: 1 addition & 0 deletions src/snowflake/sqlalchemy/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@

APPLICATION_NAME = "SnowflakeSQLAlchemy"
SNOWFLAKE_SQLALCHEMY_VERSION = VERSION
DIALECT_NAME = "snowflake"
30 changes: 26 additions & 4 deletions src/snowflake/sqlalchemy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy.sql.selectable import Lateral, SelectState

from .compat import IS_VERSION_20, args_reducer, string_types
from .custom_commands import AWSBucket, AzureContainer, ExternalStage
from snowflake.sqlalchemy._constants import DIALECT_NAME
from snowflake.sqlalchemy.compat import IS_VERSION_20, args_reducer, string_types
from snowflake.sqlalchemy.custom_commands import (
AWSBucket,
AzureContainer,
ExternalStage,
)

from .functions import flatten
from .sql.custom_schema.options.table_option_base import TableOptionBase
from .util import (
_find_left_clause_to_join_from,
_set_connection_interpolate_empty_sequences,
Expand Down Expand Up @@ -878,7 +885,7 @@ def get_column_specification(self, column, **kwargs):

return " ".join(colspec)

def post_create_table(self, table):
def handle_cluster_by(self, table):
"""
Handles snowflake-specific ``CREATE TABLE ... CLUSTER BY`` syntax.

Expand Down Expand Up @@ -908,14 +915,29 @@ def post_create_table(self, table):
<BLANKLINE>
"""
text = ""
info = table.dialect_options["snowflake"]
info = table.dialect_options[DIALECT_NAME]
cluster = info.get("clusterby")
if cluster:
text += " CLUSTER BY ({})".format(
", ".join(self.denormalize_column_name(key) for key in cluster)
)
return text

def post_create_table(self, table):
text = self.handle_cluster_by(table)
options = [
option
for _, option in table.dialect_options[DIALECT_NAME].items()
if isinstance(option, TableOptionBase)
]
options.sort(
key=lambda x: (x.__priority__.value, x.__option_name__), reverse=True
)
for option in options:
text += "\t" + option.render_option(self)

return text

def visit_create_stage(self, create_stage, **kw):
"""
This visitor will create the SQL representation for a CREATE STAGE command.
Expand Down
3 changes: 2 additions & 1 deletion src/snowflake/sqlalchemy/snowdialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from snowflake.connector.constants import UTF8
from snowflake.sqlalchemy.compat import returns_unicode

from ._constants import DIALECT_NAME
from .base import (
SnowflakeCompiler,
SnowflakeDDLCompiler,
Expand Down Expand Up @@ -119,7 +120,7 @@


class SnowflakeDialect(default.DefaultDialect):
name = "snowflake"
name = DIALECT_NAME
driver = "snowflake"
max_identifier_length = 255
cte_follows_insert = True
Expand Down
3 changes: 3 additions & 0 deletions src/snowflake/sqlalchemy/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
6 changes: 6 additions & 0 deletions src/snowflake/sqlalchemy/sql/custom_schema/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
from .dynamic_table import DynamicTable

__all__ = ["DynamicTable"]
51 changes: 51 additions & 0 deletions src/snowflake/sqlalchemy/sql/custom_schema/custom_table_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
import typing
from typing import Any

from sqlalchemy.exc import ArgumentError
from sqlalchemy.sql.schema import MetaData, SchemaItem, Table

from ..._constants import DIALECT_NAME
from ...compat import IS_VERSION_20
from ...custom_commands import NoneType
from .options.table_option import TableOption


class CustomTableBase(Table):
__table_prefix__ = ""
_support_primary_and_foreign_keys = True

def __init__(
self,
name: str,
metadata: MetaData,
*args: SchemaItem,
**kw: Any,
) -> None:
if self.__table_prefix__ != "":
prefixes = kw.get("prefixes", []) + [self.__table_prefix__]
kw.update(prefixes=prefixes)
if not IS_VERSION_20 and hasattr(super(), "_init"):
super()._init(name, metadata, *args, **kw)
else:
super().__init__(name, metadata, *args, **kw)

if not kw.get("autoload_with", False):
self._validate_table()

def _validate_table(self):
if not self._support_primary_and_foreign_keys and (
self.primary_key or self.foreign_keys
):
raise ArgumentError(
f"Primary key and foreign keys are not supported in {self.__table_prefix__} TABLE."
)

return True

def _get_dialect_option(self, option_name: str) -> typing.Optional[TableOption]:
if option_name in self.dialect_options[DIALECT_NAME]:
return self.dialect_options[DIALECT_NAME][option_name]
return NoneType
86 changes: 86 additions & 0 deletions src/snowflake/sqlalchemy/sql/custom_schema/dynamic_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

import typing
from typing import Any

from sqlalchemy.exc import ArgumentError
from sqlalchemy.sql.schema import MetaData, SchemaItem

from snowflake.sqlalchemy.custom_commands import NoneType

from .options.target_lag import TargetLag
from .options.warehouse import Warehouse
from .table_from_query import TableFromQueryBase


class DynamicTable(TableFromQueryBase):
"""
A class representing a dynamic table with configurable options and settings.

The `DynamicTable` class allows for the creation and querying of tables with
specific options, such as `Warehouse` and `TargetLag`.

While it does not support reflection at this time, it provides a flexible
interface for creating dynamic tables and management.

"""

__table_prefix__ = "DYNAMIC"

_support_primary_and_foreign_keys = False

@property
def warehouse(self) -> typing.Optional[Warehouse]:
return self._get_dialect_option(Warehouse.__option_name__)

@property
def target_lag(self) -> typing.Optional[TargetLag]:
return self._get_dialect_option(TargetLag.__option_name__)

def __init__(
self,
name: str,
metadata: MetaData,
*args: SchemaItem,
**kw: Any,
) -> None:
if kw.get("_no_init", True):
return
super().__init__(name, metadata, *args, **kw)

def _init(
self,
name: str,
metadata: MetaData,
*args: SchemaItem,
**kw: Any,
) -> None:
super().__init__(name, metadata, *args, **kw)

def _validate_table(self):
missing_attributes = []
if self.target_lag is NoneType:
missing_attributes.append("TargetLag")
if self.warehouse is NoneType:
missing_attributes.append("Warehouse")
if self.as_query is NoneType:
missing_attributes.append("AsQuery")
if missing_attributes:
raise ArgumentError(
"DYNAMIC TABLE must have the following arguments: %s"
% ", ".join(missing_attributes)
)
super()._validate_table()

def __repr__(self) -> str:
return "DynamicTable(%s)" % ", ".join(
[repr(self.name)]
+ [repr(self.metadata)]
+ [repr(x) for x in self.columns]
+ [repr(self.target_lag)]
+ [repr(self.warehouse)]
+ [repr(self.as_query)]
+ [f"{k}={repr(getattr(self, k))}" for k in ["schema"]]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from .as_query import AsQuery
from .target_lag import TargetLag, TimeUnit
from .warehouse import Warehouse

__all__ = ["Warehouse", "AsQuery", "TargetLag", "TimeUnit"]
62 changes: 62 additions & 0 deletions src/snowflake/sqlalchemy/sql/custom_schema/options/as_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
from typing import Union

from sqlalchemy.sql import Selectable

from .table_option import TableOption
from .table_option_base import Priority


class AsQuery(TableOption):
"""Class to represent an AS clause in tables.
This configuration option is used to specify the query from which the table is created.
For further information on this clause, please refer to: https://docs.snowflake.com/en/sql-reference/sql/create-table#create-table-as-select-also-referred-to-as-ctas


AsQuery example usage using an input string:
DynamicTable(
"sometable", metadata,
Column("name", String(50)),
Column("address", String(100)),
AsQuery('select name, address from existing_table where name = "test"')
)

AsQuery example usage using a selectable statement:
DynamicTable(
"sometable",
Base.metadata,
TargetLag(10, TimeUnit.SECONDS),
Warehouse("warehouse"),
AsQuery(select(test_table_1).where(test_table_1.c.id == 23))
)

"""

__option_name__ = "as_query"
__priority__ = Priority.LOWEST

def __init__(self, query: Union[str, Selectable]) -> None:
r"""Construct an as_query object.

:param \*expressions:
AS <query>

"""
self.query = query

@staticmethod
def template() -> str:
return "AS %s"

def get_expression(self):
if isinstance(self.query, Selectable):
return self.query.compile(compile_kwargs={"literal_binds": True})
return self.query

def render_option(self, compiler) -> str:
return AsQuery.template() % (self.get_expression())

def __repr__(self) -> str:
return "Query(%s)" % self.get_expression()
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#
from typing import Any

from sqlalchemy import exc
from sqlalchemy.sql.base import SchemaEventTarget
from sqlalchemy.sql.schema import SchemaItem, Table

from snowflake.sqlalchemy._constants import DIALECT_NAME

from .table_option_base import TableOptionBase


class TableOption(TableOptionBase, SchemaItem):
def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
if self.__option_name__ == "default":
raise exc.SQLAlchemyError(f"{self.__class__.__name__} does not has a name")
if not isinstance(parent, Table):
raise exc.SQLAlchemyError(
f"{self.__class__.__name__} option can only be applied to Table"
)
parent.dialect_options[DIALECT_NAME][self.__option_name__] = self

def _set_table_option_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
pass
Loading
Loading