Skip to content

Commit

Permalink
CrateDB: Derive from dbt-postgres. Remove Python adapter code.
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Nov 20, 2024
1 parent 926a551 commit 6d37e3f
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 325 deletions.
13 changes: 3 additions & 10 deletions dbt/adapters/cratedb/column.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
from dbt.adapters.base import Column
from dbt.adapters.postgres import PostgresColumn


class CrateDBColumn(Column):
@property
def data_type(self):
# on cratedb, do not convert 'text' or 'varchar' to 'varchar()'
if self.dtype.lower() == "text" or (
self.dtype.lower() == "character varying" and self.char_size is None
):
return self.dtype
return super().data_type
class CrateDBColumn(PostgresColumn):
pass
4 changes: 2 additions & 2 deletions dbt/adapters/cratedb/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import TypeCodeNotFound

from dbt.adapters.cratedb.record import PostgresRecordReplayHandle
from dbt.adapters.sql import SQLConnectionManager
from dbt_common.exceptions import DbtDatabaseError, DbtRuntimeError
from dbt_common.events.functions import warn_or_error
Expand All @@ -21,6 +20,7 @@
import psycopg2
from typing_extensions import Annotated

from dbt.adapters.cratedb.record.handle import CrateDBRecordReplayHandle
from dbt.adapters.cratedb.util import SQLStatement

logger = AdapterLogger("CrateDB")
Expand Down Expand Up @@ -183,7 +183,7 @@ def connect():
if rec_mode is not None:
# If using the record/replay mechanism, regardless of mode, we
# use a wrapper.
handle = PostgresRecordReplayHandle(handle, connection)
handle = CrateDBRecordReplayHandle(handle, connection)

if credentials.role:
handle.cursor().execute("set role {}".format(credentials.role))
Expand Down
52 changes: 6 additions & 46 deletions dbt/adapters/cratedb/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
CrossDbReferenceProhibitedError,
IndexConfigError,
IndexConfigNotDictError,
UnexpectedDbReferenceError,
)
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.postgres import PostgresAdapter
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.dataclass_schema import ValidationError, dbtClassMixin
from dbt_common.exceptions import DbtRuntimeError
Expand Down Expand Up @@ -64,19 +63,19 @@ class CrateDBConfig(AdapterConfig):
indexes: Optional[List[CrateDBIndexConfig]] = None


class CrateDBAdapter(SQLAdapter):
class CrateDBAdapter(PostgresAdapter):
Relation = CrateDBRelation
ConnectionManager = CrateDBConnectionManager
ConnectionManager = CrateDBConnectionManager # type: ignore[assignment]
Column = CrateDBColumn

AdapterSpecificConfigs = CrateDBConfig
AdapterSpecificConfigs = CrateDBConfig # type: ignore[assignment]

CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.ENFORCED,
ConstraintType.not_null: ConstraintSupport.ENFORCED,
ConstraintType.unique: ConstraintSupport.ENFORCED,
ConstraintType.unique: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.primary_key: ConstraintSupport.ENFORCED,
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
}

CATALOG_BY_RELATION_SUPPORT = True
Expand All @@ -85,20 +84,6 @@ class CrateDBAdapter(SQLAdapter):
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
)

@classmethod
def date_function(cls):
return "now()"

@available
def verify_database(self, database):
if database.startswith('"'):
database = database.strip('"')
expected = self.config.credentials.database
if database.lower() != expected.lower():
raise UnexpectedDbReferenceError(self.type(), database, expected)
# return an empty string on success so macros can call this
return ""

@available
def parse_index(self, raw_index: Any) -> Optional[CrateDBIndexConfig]:
return CrateDBIndexConfig.parse(raw_index)
Expand Down Expand Up @@ -131,31 +116,6 @@ def _get_catalog_schemas(self, manifest):
except DbtRuntimeError as exc:
raise CrossDbReferenceProhibitedError(self.type(), exc.msg)

def _link_cached_relations(self, manifest) -> None:
schemas: Set[str] = set()
relations_schemas = self._get_cache_schemas(manifest)
for relation in relations_schemas:
self.verify_database(relation.database)
schemas.add(relation.schema.lower()) # type: ignore

self._link_cached_database_relations(schemas)

def _relations_cache_for_schemas(self, manifest, cache_schemas=None):
super()._relations_cache_for_schemas(manifest, cache_schemas)
self._link_cached_relations(manifest)

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"{add_to} + interval '{number} {interval}'"

def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert", "merge", "microbatch"]

def debug_query(self):
self.execute("select 1 as id")

def get_rows_different_sql(
self,
relation_a: BaseRelation,
Expand Down
3 changes: 1 addition & 2 deletions dbt/adapters/cratedb/record/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from dbt.adapters.cratedb.record.cursor.cursor import PostgresRecordReplayCursor
from dbt.adapters.cratedb.record.handle import PostgresRecordReplayHandle
from dbt.adapters.cratedb.record.handle import CrateDBRecordReplayHandle
15 changes: 0 additions & 15 deletions dbt/adapters/cratedb/record/cursor/cursor.py

This file was deleted.

21 changes: 0 additions & 21 deletions dbt/adapters/cratedb/record/cursor/status.py

This file was deleted.

13 changes: 3 additions & 10 deletions dbt/adapters/cratedb/record/handle.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
from dbt.adapters.record import RecordReplayHandle
from dbt.adapters.postgres.record import PostgresRecordReplayHandle

from dbt.adapters.cratedb.record.cursor.cursor import PostgresRecordReplayCursor


class PostgresRecordReplayHandle(RecordReplayHandle):
"""A custom extension of RecordReplayHandle that returns
a psycopg-specific PostgresRecordReplayCursor object."""

def cursor(self):
cursor = None if self.native_handle is None else self.native_handle.cursor()
return PostgresRecordReplayCursor(cursor, self.connection)
class CrateDBRecordReplayHandle(PostgresRecordReplayHandle):
pass
105 changes: 3 additions & 102 deletions dbt/adapters/cratedb/relation.py
Original file line number Diff line number Diff line change
@@ -1,111 +1,12 @@
from dataclasses import dataclass, field
from typing import FrozenSet, List, Optional

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.relation import RelationConfig, RelationType
from dbt.adapters.relation_configs import (
RelationConfigChangeAction,
RelationResults,
)
from dbt_common.exceptions import DbtRuntimeError
from dataclasses import dataclass
from dbt.adapters.postgres import PostgresRelation

from dbt.adapters.cratedb.relation_configs import (
MAX_CHARACTERS_IN_IDENTIFIER,
PostgresIndexConfig,
PostgresIndexConfigChange,
PostgresMaterializedViewConfig,
PostgresMaterializedViewConfigChangeCollection,
)


@dataclass(frozen=True, eq=False, repr=False)
class CrateDBRelation(BaseRelation):
renameable_relations: FrozenSet[RelationType] = field(
default_factory=lambda: frozenset(
{
RelationType.View,
RelationType.Table,
}
)
)
replaceable_relations: FrozenSet[RelationType] = field(
default_factory=lambda: frozenset(
{
RelationType.View,
RelationType.Table,
}
)
)

def __post_init__(self):
# Check for length of Postgres table/view names.
# Check self.type to exclude test relation identifiers
if (
self.identifier is not None
and self.type is not None
and len(self.identifier) > self.relation_max_name_length()
):
raise DbtRuntimeError(
f"Relation name '{self.identifier}' "
f"is longer than {self.relation_max_name_length()} characters"
)

class CrateDBRelation(PostgresRelation):
def relation_max_name_length(self):
return MAX_CHARACTERS_IN_IDENTIFIER

def get_materialized_view_config_change_collection(
self, relation_results: RelationResults, relation_config: RelationConfig
) -> Optional[PostgresMaterializedViewConfigChangeCollection]:
config_change_collection = PostgresMaterializedViewConfigChangeCollection()

existing_materialized_view = PostgresMaterializedViewConfig.from_relation_results(
relation_results
)
new_materialized_view = PostgresMaterializedViewConfig.from_config(relation_config)

config_change_collection.indexes = self._get_index_config_changes(
existing_materialized_view.indexes, new_materialized_view.indexes
)

# we return `None` instead of an empty `PostgresMaterializedViewConfigChangeCollection` object
# so that it's easier and more extensible to check in the materialization:
# `core/../materializations/materialized_view.sql` :
# {% if configuration_changes is none %}
if config_change_collection.has_changes:
return config_change_collection
return None

def _get_index_config_changes(
self,
existing_indexes: FrozenSet[PostgresIndexConfig],
new_indexes: FrozenSet[PostgresIndexConfig],
) -> List[PostgresIndexConfigChange]:
"""
Get the index updates that will occur as a result of a new run
There are four scenarios:
1. Indexes are equal -> don't return these
2. Index is new -> create these
3. Index is old -> drop these
4. Indexes are not equal -> drop old, create new -> two actions
*Note:*
The order of the operations matters here because if the same index is dropped and recreated
(e.g. via --full-refresh) then we need to drop it first, then create it.
Returns: an ordered list of index updates in the form {"action": "drop/create", "context": <IndexConfig>}
"""
drop_changes = [
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.drop, "context": index}
)
for index in existing_indexes.difference(new_indexes)
]
create_changes = [
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.create, "context": index}
)
for index in new_indexes.difference(existing_indexes)
]
return drop_changes + create_changes # type: ignore[return-value]
4 changes: 2 additions & 2 deletions dbt/adapters/cratedb/relation_configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
MAX_CHARACTERS_IN_IDENTIFIER,
)
from dbt.adapters.cratedb.relation_configs.index import (
PostgresIndexConfig,
PostgresIndexConfigChange,
CrateDBIndexConfig,
CrateDBIndexConfigChange,
)
from dbt.adapters.cratedb.relation_configs.materialized_view import (
PostgresMaterializedViewConfig,
Expand Down
Loading

0 comments on commit 6d37e3f

Please sign in to comment.