Skip to content

Commit

Permalink
Added a migration state to skip already migrated tables (#325)
Browse files Browse the repository at this point in the history
Add a migration state to skip already migrated tables using an in-memory
state

---------

Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
  • Loading branch information
2 people authored and FastLee committed Sep 29, 2023
1 parent b6015a6 commit 8249711
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 16 deletions.
21 changes: 17 additions & 4 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,15 @@ def __init__(
tc: TablesCrawler,
ws: WorkspaceClient,
backend: SqlBackend,
inventory_database: str,
default_catalog=None,
database_to_catalog_mapping: dict[str, str] | None = None,
):
self._tc = tc
self._backend = backend
self._ws = ws
self._inventory_database = inventory_database
self._database_to_catalog_mapping = database_to_catalog_mapping
self._seen_tables = {}
self._default_catalog = self._init_default_catalog(default_catalog)
self._seen_tables = {}

@staticmethod
def _init_default_catalog(default_catalog):
Expand All @@ -172,6 +170,7 @@ def _init_default_catalog(default_catalog):
return "ucx_default" # TODO : Fetch current workspace name and append it to the default catalog.

def migrate_tables(self):
self._init_seen_tables()
tasks = []
for table in self._tc.snapshot():
target_catalog = self._default_catalog
Expand All @@ -184,12 +183,26 @@ def _migrate_table(self, target_catalog, table):
try:
sql = table.uc_create_sql(target_catalog)
logger.debug(f"Migrating table {table.key} to using SQL query: {sql}")
target = f"{target_catalog}.{table.database}.{table.name}".lower()

if table.object_type == "MANAGED":
if self._table_already_upgraded(target):
logger.info(f"Table {table.key} already upgraded to {self._seen_tables[target]}")
elif table.object_type == "MANAGED":
self._backend.execute(sql)
self._backend.execute(table.sql_alter_to(target_catalog))
self._backend.execute(table.sql_alter_from(target_catalog))
self._seen_tables[target] = table.key
else:
logger.info(f"Table {table.key} is a {table.object_type} and is not supported for migration yet ")
except Exception as e:
logger.error(f"Could not create table {table.name} because: {e}")

def _init_seen_tables(self):
for catalog in self._ws.catalogs.list():
for schema in self._ws.schemas.list(catalog_name=catalog.name):
for table in self._ws.tables.list(catalog_name=catalog.name, schema_name=schema.name):
if table.properties is not None and "upgraded_from" in table.properties:
self._seen_tables[table.full_name.lower()] = table.properties["upgraded_from"].lower()

def _table_already_upgraded(self, target) -> bool:
return target in self._seen_tables
12 changes: 10 additions & 2 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def create(*, catalog: str = "hive_metastore", schema: str | None = None):
return schema

yield from factory( # noqa: F405
"schema", create, lambda schema_name: sql_exec(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")
"schema", create, lambda schema: sql_exec(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
)


Expand All @@ -104,15 +104,18 @@ def make_table(sql_exec, make_schema, make_random):
def create(
*,
catalog="hive_metastore",
name: str | None = None,
schema: str | None = None,
ctas: str | None = None,
non_delta: bool = False,
external: bool = False,
view: bool = False,
tbl_properties: dict[str, str] | None = None,
):
if schema is None:
schema = make_schema(catalog=catalog)
name = f"{schema}.ucx_T{make_random(4)}".lower()
if name is None:
name = f"{schema}.ucx_T{make_random(4)}".lower()
ddl = f'CREATE {"VIEW" if view else "TABLE"} {name}'
if ctas is not None:
# temporary (if not view)
Expand All @@ -128,6 +131,10 @@ def create(
else:
# managed table
ddl = f"{ddl} (id INT, value STRING)"
if tbl_properties:
tbl_properties = ",".join([f" '{k}' = '{v}' " for k, v in tbl_properties.items()])
ddl = f"{ddl} TBLPROPERTIES ({tbl_properties})"

sql_exec(ddl)
return name

Expand All @@ -150,6 +157,7 @@ def test_table_fixture(make_table):
logger.info(f"Created new external JSON table in new schema: {make_table(non_delta=True)}")
logger.info(f'Created new tmp table in new schema: {make_table(ctas="SELECT 2+2 AS four")}')
logger.info(f'Created new view in new schema: {make_table(view=True, ctas="SELECT 2+2 AS four")}')
logger.info(f'Created table with properties: {make_table(tbl_properties={"test": "tableproperty"})}')


@pytest.fixture
Expand Down
40 changes: 37 additions & 3 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ def test_migrate_managed_tables(ws, make_catalog, make_schema, make_table):

managed_table = make_table(schema=schema_a)

logger.info(f"target catalog={target_catalog}, managed_table={managed_table}")
logger.info(f"target_catalog={target_catalog}, managed_table={managed_table}")

inventory_schema = make_schema(catalog="hive_metastore")
_, inventory_schema = inventory_schema.split(".")

backend = StatementExecutionBackend(ws, os.environ["TEST_DEFAULT_WAREHOUSE_ID"])
crawler = TablesCrawler(backend, inventory_schema)
tm = TablesMigrate(crawler, ws, backend, target_catalog, inventory_schema)
tm = TablesMigrate(crawler, ws, backend, target_catalog)
tm.migrate_tables()

target_tables = list(backend.fetch(f"SHOW TABLES IN {target_catalog}.{target_schema}"))
Expand All @@ -38,13 +38,47 @@ def test_migrate_managed_tables(ws, make_catalog, make_schema, make_table):
assert target_table_properties["upgraded_from"] == managed_table


def test_migrate_tables_with_cache_should_not_create_table(ws, make_random, make_catalog, make_schema, make_table):
target_catalog = make_catalog()
schema_a = make_schema(catalog="hive_metastore")
_, target_schema = schema_a.split(".")

make_schema(catalog=target_catalog, schema=target_schema)

table_name = make_random().lower()
target_table = f"{target_catalog}.{target_schema}.{table_name}"
source_table = f"hive_metastore.{target_schema}.{table_name}"
target_managed_table = make_table(name=target_table, tbl_properties={"upgraded_from": f"{source_table}"})
source_managed_table = make_table(name=source_table, tbl_properties={"upgraded_from": f"{target_table}"})

logger.info(
f"target_catalog={target_catalog}, "
f"source_managed_table={source_managed_table}"
f"target_managed_table={target_managed_table}"
f""
)

inventory_schema = make_schema(catalog="hive_metastore")
_, inventory_schema = inventory_schema.split(".")

backend = StatementExecutionBackend(ws, os.environ["TEST_DEFAULT_WAREHOUSE_ID"])
crawler = TablesCrawler(backend, inventory_schema)
tm = TablesMigrate(crawler, ws, backend, target_catalog)
tm.migrate_tables()

target_tables = list(backend.fetch(f"SHOW TABLES IN {target_catalog}.{target_schema}"))
assert len(target_tables) == 1
assert target_tables[0]["database"] == target_schema
assert target_tables[0]["tableName"] == table_name


@pytest.mark.skip(reason="Needs Storage credential + External Location in place")
def test_migrate_external_table(ws, make_catalog, make_schema, make_table):
target_catalog = make_catalog()
schema_a = make_schema(catalog="hive_metastore")
_, target_schema = schema_a.split(".")

make_schema(catalog=target_catalog, schema_name=target_schema)
make_schema(catalog=target_catalog, schema=target_schema)

external_table = make_table(schema=schema_a, external=True)

Expand Down
34 changes: 27 additions & 7 deletions tests/unit/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from unittest.mock import MagicMock

import pytest
from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo, TableInfo

from databricks.labs.ucx.hive_metastore.tables import TablesCrawler, TablesMigrate

Expand All @@ -28,7 +28,7 @@ def test_migrate_managed_tables_should_produce_proper_queries():
backend = MockBackend(fails_on_first=errors, rows=rows)
tc = TablesCrawler(backend, "inventory_database")
client = MagicMock()
tm = TablesMigrate(tc, client, backend, "")
tm = TablesMigrate(tc, client, backend)
tm.migrate_tables()

assert (list(backend.queries)) == [
Expand All @@ -39,18 +39,22 @@ def test_migrate_managed_tables_should_produce_proper_queries():
]


@pytest.mark.skip(reason="Not implemented yet")
def test_migrate_managed_tables_should_do_nothing_if_upgrade_tag_is_present():
errors = {}
rows = {
"SELECT": [
("hive_metastore", "db1", "managed", "MANAGED", "DELTA", None, None, "[upgraded_to=target]"),
("hive_metastore", "db1", "managed", "MANAGED", "DELTA", None, None),
]
}
backend = MockBackend(fails_on_first=errors, rows=rows)
tc = TablesCrawler(backend, "inventory_database")
client = MagicMock()
tm = TablesMigrate(tc, client, backend, "")
client.catalogs.list.return_value = [CatalogInfo(name="catalog_1")]
client.schemas.list.return_value = [SchemaInfo(name="db1")]
client.tables.list.return_value = [
TableInfo(full_name="catalog_1.db1.managed", properties={"upgraded_from": "hive_metastore.db1.managed"})
]
tm = TablesMigrate(tc, client, backend, default_catalog="catalog_1")
tm.migrate_tables()

assert (list(backend.queries)) == ["SELECT * FROM hive_metastore.inventory_database.tables"]
Expand All @@ -67,7 +71,7 @@ def test_migrate_tables_should_migrate_tables_to_default_catalog_if_not_found_in
tc = TablesCrawler(backend, "inventory_database")
client = MagicMock()
database_to_catalog_mapping = {"db1": "catalog_1", "db2": "catalog_2"}
tm = TablesMigrate(tc, client, backend, "", database_to_catalog_mapping=database_to_catalog_mapping)
tm = TablesMigrate(tc, client, backend, database_to_catalog_mapping=database_to_catalog_mapping)
tm.migrate_tables()

assert (list(backend.queries)) == [
Expand All @@ -88,7 +92,7 @@ def test_migrate_tables_should_migrate_tables_to_default_catalog_if_specified():
backend = MockBackend(fails_on_first=errors, rows=rows)
tc = TablesCrawler(backend, "inventory_database")
client = MagicMock()
tm = TablesMigrate(tc, client, backend, "", default_catalog="test_catalog")
tm = TablesMigrate(tc, client, backend, default_catalog="test_catalog")
tm.migrate_tables()

assert (list(backend.queries)) == [
Expand All @@ -97,3 +101,19 @@ def test_migrate_tables_should_migrate_tables_to_default_catalog_if_specified():
"ALTER TABLE hive_metastore.db1.managed SET TBLPROPERTIES ('upgraded_to' = 'test_catalog.db1.managed');",
"ALTER TABLE test_catalog.db1.managed SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1.managed');",
]


def test_migrate_tables_should_add_table_to_cache_when_migrated():
errors = {}
rows = {
"SELECT": [
("hive_metastore", "db1", "managed", "MANAGED", "DELTA", None, None),
]
}
backend = MockBackend(fails_on_first=errors, rows=rows)
tc = TablesCrawler(backend, "inventory_database")
client = MagicMock()
tm = TablesMigrate(tc, client, backend, default_catalog="test_catalog")
tm.migrate_tables()

assert tm._seen_tables == {"test_catalog.db1.managed": "hive_metastore.db1.managed"}

0 comments on commit 8249711

Please sign in to comment.