From 00862afe101a484e0295bcca81b389570f35f58a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 2 Sep 2024 20:25:18 +0200 Subject: [PATCH 01/12] MongoDB: Rename columns with leading underscores ... to use double leading underscores. --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/util.py | 10 ++++++---- tests/io/mongodb/test_util.py | 5 ++++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ae678193..e947712f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- MongoDB: Rename columns with leading underscores to use double leading underscores ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 81e63a53..2e5d0f6b 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -28,12 +28,14 @@ def parse_input_numbers(s: str): def sanitize_field_names(data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: """ - CrateDB does not accept leading underscores as top-level column names, like `_foo`. + Rename top-level column names with single leading underscores to double leading underscores. + CrateDB does not accept singe leading underscores, like `_id`. - Utility function to rename all relevant column names, keeping their order intact. + This utility function to rename all relevant column names keeps their order intact. + When loosing order is acceptable, a more efficient variant could be used. """ d = OrderedDictX(data) for name in d.keys(): - if name.startswith("_") and name[1] != "_": - d.rename_key(name, name[1:]) + if name.startswith("_") and not name.startswith("__"): + d.rename_key(name, f"_{name}") return d diff --git a/tests/io/mongodb/test_util.py b/tests/io/mongodb/test_util.py index 731c79d7..0146d0cf 100644 --- a/tests/io/mongodb/test_util.py +++ b/tests/io/mongodb/test_util.py @@ -2,7 +2,7 @@ import pytest -from cratedb_toolkit.io.mongodb.util import parse_input_numbers +from cratedb_toolkit.io.mongodb.util import parse_input_numbers, sanitize_field_names pytestmark = pytest.mark.mongodb @@ -37,3 +37,6 @@ def test_mixed(self): s = "0 1, 3 5-8, 9 12-10" parsed = parse_input_numbers(s) self.assertEqual(parsed, [0, 1, 3, 5, 6, 7, 8, 9, 10, 11, 12]) + + def test_sanitize_field_names(self): + assert sanitize_field_names({"_id": "foo"}) == {"__id": "foo"} From 8f3aeabbd9528f69350d1824828d8707e4bc190e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 2 Sep 2024 20:25:51 +0200 Subject: [PATCH 02/12] MongoDB: Add support for UUID types --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/export.py | 5 +++++ cratedb_toolkit/io/mongodb/extract.py | 4 ++++ cratedb_toolkit/io/mongodb/translate.py | 1 + 4 files changed, 11 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index e947712f..2cad8c2b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - MongoDB: Rename columns with leading underscores to use double leading underscores +- MongoDB: Add support for UUID types ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 87cafc15..bc301fc7 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -24,8 +24,10 @@ Export the documents from a MongoDB collection as JSON, to be ingested into CrateDB. """ +import base64 import calendar import typing as t +from uuid import UUID import bsonjs import dateutil.parser as dateparser @@ -65,6 +67,9 @@ def extract_value(value, parent_type=None): """ if isinstance(value, dict): if len(value) == 1: + if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: + decoded = UUID(bytes=base64.b64decode(value["$binary"]["base64"])) + return extract_value(decoded, parent_type) for k, v in value.items(): if k.startswith("$"): return extract_value(v, k.lstrip("$")) diff --git a/cratedb_toolkit/io/mongodb/extract.py b/cratedb_toolkit/io/mongodb/extract.py index 74e58e8a..9872a505 100644 --- a/cratedb_toolkit/io/mongodb/extract.py +++ b/cratedb_toolkit/io/mongodb/extract.py @@ -67,6 +67,7 @@ import typing as t import bson +from bson import OLD_UUID_SUBTYPE, UUID_SUBTYPE from pymongo.collection import Collection from rich import progress from rich.console import Console @@ -197,4 +198,7 @@ def get_type(value): return "INTEGER" else: return "INT64" + if type_ is bson.binary.Binary: + if value.subtype in [OLD_UUID_SUBTYPE, UUID_SUBTYPE]: + return "UUID" return TYPES_MAP.get(type_, "UNKNOWN") diff --git a/cratedb_toolkit/io/mongodb/translate.py b/cratedb_toolkit/io/mongodb/translate.py index 6d7dcf4a..ce1c42d8 100644 --- a/cratedb_toolkit/io/mongodb/translate.py +++ b/cratedb_toolkit/io/mongodb/translate.py @@ -37,6 +37,7 @@ TYPES = { "OID": "TEXT", + "UUID": "TEXT", "DATETIME": "TIMESTAMP WITH TIME ZONE", "TIMESTAMP": "TIMESTAMP WITHOUT TIME ZONE", "INT64": "BIGINT", From b15743d5513c4e6c7ed17e163bf186874f3da439 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 2 Sep 2024 20:26:09 +0200 Subject: [PATCH 03/12] MongoDB: Add integration test --- cratedb_toolkit/io/mongodb/cli.py | 6 +--- cratedb_toolkit/io/mongodb/core.py | 5 +-- tests/io/mongodb/test_cli.py | 56 +++++++++++++++++++++++++++++- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/cratedb_toolkit/io/mongodb/cli.py b/cratedb_toolkit/io/mongodb/cli.py index 48cf27c6..74eb359e 100644 --- a/cratedb_toolkit/io/mongodb/cli.py +++ b/cratedb_toolkit/io/mongodb/cli.py @@ -17,8 +17,6 @@ def extract_parser(subargs): parser = subargs.add_parser("extract", help="Extract a schema from a MongoDB database") parser.add_argument("--url", default="mongodb://localhost:27017", help="MongoDB URL") - parser.add_argument("--host", default="localhost", help="MongoDB host") - parser.add_argument("--port", default=27017, help="MongoDB port") parser.add_argument("--database", required=True, help="MongoDB database") parser.add_argument("--collection", help="MongoDB collection to create a schema for") parser.add_argument( @@ -42,10 +40,8 @@ def translate_parser(subargs): def export_parser(subargs): parser = subargs.add_parser("export", help="Export a MongoDB collection as plain JSON") parser.add_argument("--url", default="mongodb://localhost:27017", help="MongoDB URL") - parser.add_argument("--collection", required=True) - parser.add_argument("--host", default="localhost", help="MongoDB host") - parser.add_argument("--port", default=27017, help="MongoDB port") parser.add_argument("--database", required=True, help="MongoDB database") + parser.add_argument("--collection", required=True, help="MongoDB collection to export") parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents") parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file") diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index ca0e0772..7e1744ec 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -57,10 +57,7 @@ def gather_collections(database) -> t.List[str]: def get_mongodb_client_database(args, **kwargs) -> t.Tuple[pymongo.MongoClient, pymongo.database.Database]: client: pymongo.MongoClient - if args.url: - client = pymongo.MongoClient(args.url, **kwargs) - else: - client = pymongo.MongoClient(args.host, int(args.port), **kwargs) + client = pymongo.MongoClient(args.url, **kwargs) db: pymongo.database.Database = client.get_database(args.database) return client, db diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index 0398000f..f0914b80 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -1,5 +1,8 @@ import os +from unittest import mock +from uuid import UUID +import dateutil import pytest from click.testing import CliRunner from pueblo.testing.dataframe import DataFrameFactory @@ -9,6 +12,7 @@ pytestmark = pytest.mark.mongodb +bson = pytest.importorskip("bson", reason="Skipping tests because bson is not installed") pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") pytest.importorskip("rich", reason="Skipping tests because rich is not installed") @@ -29,7 +33,26 @@ def test_version(): assert exitcode == 0 -def test_mongodb_load_table(caplog, cratedb, mongodb): +DOCUMENT_IN = { + "id": bson.Binary.from_uuid(UUID("d575540f-759d-4653-a4c4-4a9e410f1aa1")), + "value": { + "name": "foobar", + "active": True, + "created": dateutil.parser.parse("2020-06-19T15:03:53.727Z"), + }, +} +DOCUMENT_OUT = { + "__id": mock.ANY, + "id": "d575540f-759d-4653-a4c4-4a9e410f1aa1", + "value": { + "name": "foobar", + "active": True, + "created": 1592579033000, + }, +} + + +def test_mongodb_load_table_basic(caplog, cratedb, mongodb): """ CLI test: Invoke `ctk load table` for MongoDB. """ @@ -59,3 +82,34 @@ def test_mongodb_load_table(caplog, cratedb, mongodb): assert cratedb.database.table_exists("testdrive.demo") is True assert cratedb.database.refresh_table("testdrive.demo") is True assert cratedb.database.count_records("testdrive.demo") == 42 + + +def test_mongodb_load_table_real(caplog, cratedb, mongodb): + """ + CLI test: Invoke `ctk load table` for MongoDB. + """ + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_many([DOCUMENT_IN]) + + # Run transfer command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) + result = runner.invoke( + cli, + args=f"load table {mongodb_url}", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify data in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 1 + + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo", records=True) + assert results[0] == DOCUMENT_OUT From 8cdbd027f0a8bfe44f7e00de69adda2a3a945b76 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 2 Sep 2024 21:18:58 +0200 Subject: [PATCH 04/12] MongoDB: Improve reading timestamps in previous BSON formats bson.errors.InvalidBSON: year 292278994 is out of range Consider Using CodecOptions(datetime_conversion=DATETIME_AUTO) or MongoClient(datetime_conversion='DATETIME_AUTO'). See: https://pymongo.readthedocs.io/en/stable/examples/datetimes.html#handling-out-of-range-datetimes --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/core.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 2cad8c2b..0d4d135b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ ## Unreleased - MongoDB: Rename columns with leading underscores to use double leading underscores - MongoDB: Add support for UUID types +- MongoDB: Improve reading timestamps in previous BSON formats ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index 7e1744ec..5a342071 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -57,7 +57,7 @@ def gather_collections(database) -> t.List[str]: def get_mongodb_client_database(args, **kwargs) -> t.Tuple[pymongo.MongoClient, pymongo.database.Database]: client: pymongo.MongoClient - client = pymongo.MongoClient(args.url, **kwargs) + client = pymongo.MongoClient(args.url, datetime_conversion="DATETIME_AUTO", **kwargs) db: pymongo.database.Database = client.get_database(args.database) return client, db From 9d14596cfab0e115f35d9de666526f9391df2405 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 2 Sep 2024 21:26:17 +0200 Subject: [PATCH 05/12] MongoDB: Improve test case about different types of timestamp fields --- tests/io/mongodb/test_cli.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index f0914b80..5c4498ba 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -39,6 +39,7 @@ def test_version(): "name": "foobar", "active": True, "created": dateutil.parser.parse("2020-06-19T15:03:53.727Z"), + "timestamp": bson.datetime_ms.DatetimeMS(1455141600000), }, } DOCUMENT_OUT = { @@ -48,8 +49,20 @@ def test_version(): "name": "foobar", "active": True, "created": 1592579033000, + "timestamp": 1455141600000, }, } +DOCUMENT_DDL = """ +CREATE TABLE IF NOT EXISTS "testdrive"."demo" ( + "__id" TEXT, + "id" TEXT, + "value" OBJECT(DYNAMIC) AS ( + "name" TEXT, + "active" BOOLEAN, + "created" TIMESTAMP WITH TIME ZONE, + "timestamp" TIMESTAMP WITH TIME ZONE + ) +)""".lstrip() def test_mongodb_load_table_basic(caplog, cratedb, mongodb): @@ -106,10 +119,15 @@ def test_mongodb_load_table_real(caplog, cratedb, mongodb): ) assert result.exit_code == 0 - # Verify data in target database. + # Verify metadata in target database. assert cratedb.database.table_exists("testdrive.demo") is True assert cratedb.database.refresh_table("testdrive.demo") is True assert cratedb.database.count_records("testdrive.demo") == 1 + # Verify content in target database. results = cratedb.database.run_sql("SELECT * FROM testdrive.demo", records=True) assert results[0] == DOCUMENT_OUT + + # Verify schema in target database. + results = cratedb.database.run_sql("SHOW CREATE TABLE testdrive.demo") + assert DOCUMENT_DDL in results[0][0] From a4be36554071db6518f85c42f7e8a8476b7d2615 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 2 Sep 2024 22:11:06 +0200 Subject: [PATCH 06/12] MongoDB: Fix processing empty arrays/lists By default, assume `TEXT` as inner type. --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/translate.py | 12 +++++++++- tests/io/mongodb/test_cli.py | 32 ++++++++++++++++++++++--- 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0d4d135b..4b1aec12 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ - MongoDB: Rename columns with leading underscores to use double leading underscores - MongoDB: Add support for UUID types - MongoDB: Improve reading timestamps in previous BSON formats +- MongoDB: Fix processing empty arrays/lists. By default, assume `TEXT` as inner type. ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/translate.py b/cratedb_toolkit/io/mongodb/translate.py index ce1c42d8..0190d970 100644 --- a/cratedb_toolkit/io/mongodb/translate.py +++ b/cratedb_toolkit/io/mongodb/translate.py @@ -31,10 +31,13 @@ the type with the greatest proportion. """ +import logging from functools import reduce from cratedb_toolkit.io.mongodb.util import sanitize_field_names +logger = logging.getLogger(__name__) + TYPES = { "OID": "TEXT", "UUID": "TEXT", @@ -104,7 +107,14 @@ def determine_type(schema): Determine the type of specific field schema. """ - types = schema.get("types", {}) + # That's a workaround for empty arrays/lists. + # Let's assume an inner type of `TEXT`. + # TODO: Review mapping of empty arrays/lists. + if "types" not in schema or not schema["types"]: + logger.warning(f"Unable to determine type for incomplete schema: {schema}") + return "TEXT", None + + types = schema["types"] type_ = max(types, key=lambda item: types[item]["count"]) if type_ in TYPES: sql_type = TYPES.get(type_) diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index 5c4498ba..bb71ec04 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -4,6 +4,7 @@ import dateutil import pytest +import sqlparse from click.testing import CliRunner from pueblo.testing.dataframe import DataFrameFactory @@ -33,13 +34,21 @@ def test_version(): assert exitcode == 0 +DATETIME = dateutil.parser.parse("2020-06-19T15:03:53.727Z") + DOCUMENT_IN = { "id": bson.Binary.from_uuid(UUID("d575540f-759d-4653-a4c4-4a9e410f1aa1")), "value": { "name": "foobar", "active": True, - "created": dateutil.parser.parse("2020-06-19T15:03:53.727Z"), + "created": DATETIME, "timestamp": bson.datetime_ms.DatetimeMS(1455141600000), + "list_date": [DATETIME, DATETIME], + "list_empty": [], + "list_float": [42.42, 43.43], + "list_integer": [42, 43], + "list_object": [{"foo": "bar"}, {"baz": "qux"}], + "list_string": ["foo", "bar"], }, } DOCUMENT_OUT = { @@ -50,6 +59,12 @@ def test_version(): "active": True, "created": 1592579033000, "timestamp": 1455141600000, + "list_date": [1592579033000, 1592579033000], + "list_empty": [], + "list_float": [42.42, 43.43], + "list_integer": [42, 43], + "list_object": [{"foo": "bar"}, {"baz": "qux"}], + "list_string": ["foo", "bar"], }, } DOCUMENT_DDL = """ @@ -60,7 +75,16 @@ def test_version(): "name" TEXT, "active" BOOLEAN, "created" TIMESTAMP WITH TIME ZONE, - "timestamp" TIMESTAMP WITH TIME ZONE + "timestamp" TIMESTAMP WITH TIME ZONE, + "list_date" ARRAY(TIMESTAMP WITH TIME ZONE), + "list_empty" ARRAY(TEXT), + "list_float" ARRAY(REAL), + "list_integer" ARRAY(INTEGER), + "list_object" ARRAY(OBJECT(DYNAMIC) AS ( + "foo" TEXT, + "baz" TEXT + )), + "list_string" ARRAY(TEXT) ) )""".lstrip() @@ -130,4 +154,6 @@ def test_mongodb_load_table_real(caplog, cratedb, mongodb): # Verify schema in target database. results = cratedb.database.run_sql("SHOW CREATE TABLE testdrive.demo") - assert DOCUMENT_DDL in results[0][0] + sql = results[0][0] + sql = sqlparse.format(sql) + assert sql.startswith(DOCUMENT_DDL) From dcfb8b70348087c94ede372e70b0b45a8a3cb367 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 2 Sep 2024 23:20:31 +0200 Subject: [PATCH 07/12] MongoDB: For `ctk load table`, use "partial" scan for inferring schema ... based on the first 10,000 documents. --- CHANGES.md | 2 ++ cratedb_toolkit/io/mongodb/api.py | 2 +- cratedb_toolkit/io/mongodb/extract.py | 8 ++++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 4b1aec12..2e0ab344 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,8 @@ - MongoDB: Add support for UUID types - MongoDB: Improve reading timestamps in previous BSON formats - MongoDB: Fix processing empty arrays/lists. By default, assume `TEXT` as inner type. +- MongoDB: For `ctk load table`, use "partial" scan for inferring the collection schema, + based on the first 10,000 documents. ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index c4b2d202..7ae679b0 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -40,7 +40,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, - scan="full", + scan="partial", transformation=transformation, limit=limit, ) diff --git a/cratedb_toolkit/io/mongodb/extract.py b/cratedb_toolkit/io/mongodb/extract.py index 9872a505..f7f735eb 100644 --- a/cratedb_toolkit/io/mongodb/extract.py +++ b/cratedb_toolkit/io/mongodb/extract.py @@ -85,6 +85,10 @@ ) +# TODO: Make configurable. +PARTIAL_SCAN_COUNT = 10_000 + + def extract_schema_from_collection(collection: Collection, partial: bool, limit: int = 0) -> t.Dict[str, t.Any]: """ Extract a schema definition from a collection. @@ -95,7 +99,7 @@ def extract_schema_from_collection(collection: Collection, partial: bool, limit: schema: dict = {"count": 0, "document": {}} if partial: - count = 1 + count = PARTIAL_SCAN_COUNT else: count = collection.estimated_document_count() with progressbar: @@ -105,7 +109,7 @@ def extract_schema_from_collection(collection: Collection, partial: bool, limit: schema["count"] += 1 schema["document"] = extract_schema_from_document(document, schema["document"]) progressbar.update(task, advance=1) - if partial: + if partial and schema["count"] >= PARTIAL_SCAN_COUNT: break except KeyboardInterrupt: return schema From abf1e813bf584789a05ec27b2f45843222939820 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 2 Sep 2024 23:49:05 +0200 Subject: [PATCH 08/12] MongoDB: Skip leaking `UNKNOWN` fields into SQL DDL This means relevant column definitions will not be included into the SQL DDL. --- CHANGES.md | 2 ++ cratedb_toolkit/io/mongodb/translate.py | 13 +++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2e0ab344..4a48b5aa 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,8 @@ - MongoDB: Fix processing empty arrays/lists. By default, assume `TEXT` as inner type. - MongoDB: For `ctk load table`, use "partial" scan for inferring the collection schema, based on the first 10,000 documents. +- MongoDB: Skip leaking `UNKNOWN` fields into SQL DDL. + This means relevant column definitions will not be included into the SQL DDL. ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/translate.py b/cratedb_toolkit/io/mongodb/translate.py index 0190d970..dede652d 100644 --- a/cratedb_toolkit/io/mongodb/translate.py +++ b/cratedb_toolkit/io/mongodb/translate.py @@ -65,11 +65,16 @@ def get_columns_definition(columns): columns_definition = [] for column in columns: - if column[1]: - item = f"{column[1]}\n{column[0]}" + type_, name = column + if name: + item = f"{name}\n{type_}" else: - item = column[0] - columns_definition.append(item) + item = type_ + if type_ == "UNKNOWN": + logger.warning(f"Unable to translate column: {name}") + continue + if item: + columns_definition.append(item) return columns_definition From 2ccc0b22887d334a0f585a36650045c1dc3832b2 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 3 Sep 2024 02:44:04 +0200 Subject: [PATCH 09/12] MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` strategy --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/api.py | 42 +++++++- cratedb_toolkit/io/mongodb/copy.py | 122 +++++++++++++++++++++++ cratedb_toolkit/io/mongodb/export.py | 2 +- examples/zyp/zyp-int64-to-timestamp.yaml | 2 +- tests/io/mongodb/test_cli.py | 49 ++++----- tests/io/mongodb/test_transformation.py | 12 ++- 7 files changed, 191 insertions(+), 39 deletions(-) create mode 100644 cratedb_toolkit/io/mongodb/copy.py diff --git a/CHANGES.md b/CHANGES.md index 4a48b5aa..e674ba36 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ based on the first 10,000 documents. - MongoDB: Skip leaking `UNKNOWN` fields into SQL DDL. This means relevant column definitions will not be included into the SQL DDL. +- MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 7ae679b0..e95890c9 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -3,7 +3,9 @@ from pathlib import Path from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB +from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad from cratedb_toolkit.io.mongodb.core import export, extract, translate +from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util.cr8 import cr8_insert_json from cratedb_toolkit.util.database import DatabaseAdapter @@ -11,8 +13,10 @@ logger = logging.getLogger(__name__) -def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False): +def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False): """ + Transfer MongoDB collection using migr8. + Synopsis -------- export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo @@ -83,6 +87,42 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int return True +def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False): + """ + Transfer MongoDB collection using translator component. + + Synopsis + -------- + export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo + ctk load table mongodb://localhost:27017/testdrive/demo + """ + + # Decode database URL. + mongodb_address = DatabaseAddress.from_string(source_url) + mongodb_uri, mongodb_collection_address = mongodb_address.decode() + mongodb_database = mongodb_collection_address.schema + mongodb_collection = mongodb_collection_address.table + + logger.info(f"Invoking MongoDBFullLoad. mongodb_uri={mongodb_uri}") + + # Optionally configure transformations. + tm = None + if transformation: + tm = TransformationManager(path=transformation) + + # Invoke `full-load` procedure. + mdb_full = MongoDBFullLoad( + mongodb_url=str(mongodb_uri), + mongodb_database=mongodb_database, + mongodb_collection=mongodb_collection, + cratedb_url=target_url, + tm=tm, + progress=progress, + ) + mdb_full.start() + return True + + def mongodb_relay_cdc(source_url, target_url, progress: bool = False): """ Synopsis diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py new file mode 100644 index 00000000..d57b71f8 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -0,0 +1,122 @@ +# ruff: noqa: S608 +import logging +import typing as t + +import pymongo +import sqlalchemy as sa +from bson.raw_bson import RawBSONDocument +from commons_codec.model import SQLOperation +from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB +from tqdm import tqdm + +from cratedb_toolkit.io.mongodb.export import extract_value +from cratedb_toolkit.io.mongodb.transform import TransformationManager +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB): + def __init__(self, table_name: str, tm: TransformationManager = None): + super().__init__(table_name=table_name) + self.tm = tm + + @staticmethod + def get_document_key(record: t.Dict[str, t.Any]) -> str: + """ + Return value of document key (MongoDB document OID) from CDC record. + + "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} + """ + return record["_id"] + + def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation: + """ + Produce CrateDB INSERT SQL statement from MongoDB document. + """ + + # Define SQL INSERT statement. + sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);" + + # Converge MongoDB document to SQL parameters. + record = extract_value(self.decode_bson(document)) + oid: str = self.get_document_key(record) + parameters = {"oid": oid, "record": record} + + return SQLOperation(sql, parameters) + + +class MongoDBFullLoad: + """ + Copy MongoDB collection into CrateDB table. + """ + + def __init__( + self, + mongodb_url: str, + mongodb_database: str, + mongodb_collection: str, + cratedb_url: str, + tm: t.Union[TransformationManager, None], + mongodb_limit: int = 0, + progress: bool = False, + debug: bool = True, + ): + cratedb_address = DatabaseAddress.from_string(cratedb_url) + cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() + cratedb_table = cratedb_table_address.fullname + + self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient( + mongodb_url, + document_class=RawBSONDocument, + datetime_conversion="DATETIME_AUTO", + ) + self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] + self.mongodb_limit = mongodb_limit + self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) + self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) + self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, tm=tm) + + self.progress = progress + self.debug = debug + + def start(self): + """ + Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. + """ + records_in = self.mongodb_collection.count_documents(filter={}) + logger.info(f"Source: MongoDB collection={self.mongodb_collection} count={records_in}") + logger_on_error = logger.warning + if self.debug: + logger_on_error = logger.exception + with self.cratedb_adapter.engine.connect() as connection: + if not self.cratedb_adapter.table_exists(self.cratedb_table): + connection.execute(sa.text(self.translator.sql_ddl)) + connection.commit() + records_target = self.cratedb_adapter.count_records(self.cratedb_table) + logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") + progress_bar = tqdm(total=records_in) + records_out = 0 + + for document in self.mongodb_collection.find().limit(self.mongodb_limit): + result_size = 1 + + try: + operation = self.translator.to_sql(document) + logger.info("operation: %s", operation) + except Exception as ex: + logger_on_error(f"Transforming query failed: {ex}") + continue + try: + connection.execute(sa.text(operation.statement), operation.parameters) + records_out += result_size + progress_bar.update(n=result_size) + except Exception as ex: + logger_on_error(f"Executing query failed: {ex}") + + progress_bar.close() + connection.commit() + logger.info(f"Number of records written: {records_out}") + if records_out == 0: + logger.warning("No data has been copied") diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index bc301fc7..74ec9a6a 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -68,7 +68,7 @@ def extract_value(value, parent_type=None): if isinstance(value, dict): if len(value) == 1: if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: - decoded = UUID(bytes=base64.b64decode(value["$binary"]["base64"])) + decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) return extract_value(decoded, parent_type) for k, v in value.items(): if k.startswith("$"): diff --git a/examples/zyp/zyp-int64-to-timestamp.yaml b/examples/zyp/zyp-int64-to-timestamp.yaml index 4b4b867c..f2bea269 100644 --- a/examples/zyp/zyp-int64-to-timestamp.yaml +++ b/examples/zyp/zyp-int64-to-timestamp.yaml @@ -13,5 +13,5 @@ collections: name: demo schema: rules: - - pointer: /timestamp + - pointer: /data/timestamp type: DATETIME diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index bb71ec04..7c324876 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -52,41 +52,28 @@ def test_version(): }, } DOCUMENT_OUT = { - "__id": mock.ANY, - "id": "d575540f-759d-4653-a4c4-4a9e410f1aa1", - "value": { - "name": "foobar", - "active": True, - "created": 1592579033000, - "timestamp": 1455141600000, - "list_date": [1592579033000, 1592579033000], - "list_empty": [], - "list_float": [42.42, 43.43], - "list_integer": [42, 43], - "list_object": [{"foo": "bar"}, {"baz": "qux"}], - "list_string": ["foo", "bar"], + "oid": mock.ANY, + "data": { + "_id": mock.ANY, + "id": "d575540f-759d-4653-a4c4-4a9e410f1aa1", + "value": { + "name": "foobar", + "active": True, + "created": 1592579033000, + "timestamp": 1455141600000, + "list_date": [1592579033000, 1592579033000], + "list_empty": [], + "list_float": [42.42, 43.43], + "list_integer": [42, 43], + "list_object": [{"foo": "bar"}, {"baz": "qux"}], + "list_string": ["foo", "bar"], + }, }, } DOCUMENT_DDL = """ CREATE TABLE IF NOT EXISTS "testdrive"."demo" ( - "__id" TEXT, - "id" TEXT, - "value" OBJECT(DYNAMIC) AS ( - "name" TEXT, - "active" BOOLEAN, - "created" TIMESTAMP WITH TIME ZONE, - "timestamp" TIMESTAMP WITH TIME ZONE, - "list_date" ARRAY(TIMESTAMP WITH TIME ZONE), - "list_empty" ARRAY(TEXT), - "list_float" ARRAY(REAL), - "list_integer" ARRAY(INTEGER), - "list_object" ARRAY(OBJECT(DYNAMIC) AS ( - "foo" TEXT, - "baz" TEXT - )), - "list_string" ARRAY(TEXT) - ) -)""".lstrip() + "oid" TEXT, + "data" OBJECT(DYNAMIC)""".lstrip() def test_mongodb_load_table_basic(caplog, cratedb, mongodb): diff --git a/tests/io/mongodb/test_transformation.py b/tests/io/mongodb/test_transformation.py index 1ef416cb..cabf0c29 100644 --- a/tests/io/mongodb/test_transformation.py +++ b/tests/io/mongodb/test_transformation.py @@ -1,7 +1,6 @@ from pathlib import Path import pytest -from sqlalchemy import TIMESTAMP from tests.conftest import check_sqlalchemy2 @@ -22,6 +21,7 @@ def check_prerequisites(): check_sqlalchemy2() +@pytest.mark.skip("Wishful thinking with single column strategy") def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb): """ Verify MongoDB -> CrateDB data transfer with transformation. @@ -45,9 +45,11 @@ def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb): # Verify data in target database. cratedb.database.refresh_table("testdrive.demo") results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) - assert results[0]["timestamp"] == 1563051934000 + assert results[0]["data"]["timestamp"] == 1563051934000 # Verify schema in target database. - columns = cratedb.database.describe_table_columns("testdrive.demo") - timestamp_type = columns[3]["type"] - assert isinstance(timestamp_type, TIMESTAMP) + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['timestamp']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + assert timestamp_type == "TIMESTAMP WITH TIME ZONE" From befd57917eec6d0d5ac16719be0894b93b538dae Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 3 Sep 2024 05:43:37 +0200 Subject: [PATCH 10/12] MongoDB: Sanitize lists of varying objects --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/copy.py | 9 +++--- cratedb_toolkit/io/mongodb/export.py | 45 ++++++++++++++++++++++++++++ tests/io/mongodb/test_cli.py | 10 +++++-- 4 files changed, 58 insertions(+), 7 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e674ba36..5c43831e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ - MongoDB: Skip leaking `UNKNOWN` fields into SQL DDL. This means relevant column definitions will not be included into the SQL DDL. - MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. +- MongoDB: Sanitize lists of varying objects ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index d57b71f8..a735a1bc 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -37,7 +37,7 @@ def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation: """ # Define SQL INSERT statement. - sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);" + sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);" # Converge MongoDB document to SQL parameters. record = extract_value(self.decode_bson(document)) @@ -100,16 +100,15 @@ def start(self): records_out = 0 for document in self.mongodb_collection.find().limit(self.mongodb_limit): - result_size = 1 - try: operation = self.translator.to_sql(document) - logger.info("operation: %s", operation) + logger.debug("SQL operation: %s", operation) except Exception as ex: logger_on_error(f"Transforming query failed: {ex}") continue try: - connection.execute(sa.text(operation.statement), operation.parameters) + result = connection.execute(sa.text(operation.statement), operation.parameters) + result_size = result.rowcount records_out += result_size progress_bar.update(n=result_size) except Exception as ex: diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 74ec9a6a..2e5d711b 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -25,6 +25,7 @@ """ import base64 +import builtins import calendar import typing as t from uuid import UUID @@ -33,6 +34,7 @@ import dateutil.parser as dateparser import orjson as json import pymongo.collection +from attrs import define from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.io.mongodb.util import sanitize_field_names @@ -75,6 +77,10 @@ def extract_value(value, parent_type=None): return extract_value(v, k.lstrip("$")) return {k.lstrip("$"): extract_value(v, parent_type) for (k, v) in value.items()} if isinstance(value, list): + if value and isinstance(value[0], dict): + lovos = ListOfVaryingObjectsSanitizer(value) + lovos.apply() + return [extract_value(v, parent_type) for v in value] if parent_type: converter = type_converter.get(parent_type) @@ -83,6 +89,45 @@ def extract_value(value, parent_type=None): return value +@define +class ListOfVaryingObjectsSanitizer: + """ + CrateDB can not store lists of varying objects, so normalize them. + """ + + data: t.List[t.Dict[str, t.Any]] + + def apply(self): + self.apply_rules(self.get_rules(self.type_stats())) + + def type_stats(self) -> t.Dict[str, t.List[str]]: + types: t.Dict[str, t.List[str]] = {} + for item in self.data: + for key, value in item.items(): + types.setdefault(key, []).append(type(value).__name__) + return types + + def get_rules(self, all_types): + rules = [] + for name, types in all_types.items(): + if len(types) > 1: + rules.append({"name": name, "converter": self.get_best_converter(types)}) + return rules + + def apply_rules(self, rules): + for item in self.data: + for rule in rules: + name = rule["name"] + if name in item: + item[name] = rule["converter"](item[name]) + + @staticmethod + def get_best_converter(types: t.List[str]) -> t.Callable: + if "str" in types: + return builtins.str + return lambda x: x + + def convert(d): """ Decode MongoDB Extended JSON, considering CrateDB specifics. diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index 7c324876..a97b7243 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -47,7 +47,10 @@ def test_version(): "list_empty": [], "list_float": [42.42, 43.43], "list_integer": [42, 43], - "list_object": [{"foo": "bar"}, {"baz": "qux"}], + "list_object_symmetric": [{"foo": "bar"}, {"baz": "qux"}], + "list_object_varying_string": [{"value": 42}, {"value": "qux"}], + # TODO: Improve decoding of inner items. + "list_object_varying_date": [{"value": DATETIME}, {"value": "qux"}], "list_string": ["foo", "bar"], }, } @@ -65,7 +68,10 @@ def test_version(): "list_empty": [], "list_float": [42.42, 43.43], "list_integer": [42, 43], - "list_object": [{"foo": "bar"}, {"baz": "qux"}], + "list_object_symmetric": [{"foo": "bar"}, {"baz": "qux"}], + "list_object_varying_string": [{"value": "42"}, {"value": "qux"}], + # TODO: Improve decoding of inner items. + "list_object_varying_date": [{"value": "{'$date': '2020-06-19T15:03:53.727Z'}"}, {"value": "qux"}], "list_string": ["foo", "bar"], }, }, From 4297f5d07c5cbc6bc84573f8da36ac4c9b2abea3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 3 Sep 2024 13:18:22 +0200 Subject: [PATCH 11/12] MongoDB: Accept Zyp Treatments, and ingress pagination / egress batching - Use `--transformation` option for applying special treatments. Certain fields should be stored as lists, some need to be ignored for now, others need to be treated manually, etc. - Use pagination on source collection, for creating batches towards CrateDB. --- CHANGES.md | 3 + cratedb_toolkit/api/main.py | 5 +- cratedb_toolkit/io/mongodb/copy.py | 78 ++++++++++++---- cratedb_toolkit/io/mongodb/export.py | 114 +++++++++++------------- cratedb_toolkit/io/mongodb/model.py | 3 + cratedb_toolkit/io/mongodb/util.py | 4 +- cratedb_toolkit/util/config.py | 55 ++++++++++++ examples/zyp/zyp-treatment-all.yaml | 24 +++++ examples/zyp/zyp-treatment-ignore.yaml | 11 +++ pyproject.toml | 4 +- tests/io/mongodb/conftest.py | 6 ++ tests/io/mongodb/test_cli.py | 65 ++++++++++++-- tests/io/mongodb/test_export.py | 107 ++++++++++++++++++++++ tests/io/mongodb/test_extract.py | 11 +-- tests/io/mongodb/test_integration.py | 4 +- tests/io/mongodb/test_transformation.py | 82 +++++++++++++++-- tests/io/mongodb/test_util.py | 2 +- 17 files changed, 467 insertions(+), 111 deletions(-) create mode 100644 cratedb_toolkit/io/mongodb/model.py create mode 100644 cratedb_toolkit/util/config.py create mode 100644 examples/zyp/zyp-treatment-all.yaml create mode 100644 examples/zyp/zyp-treatment-ignore.yaml create mode 100644 tests/io/mongodb/test_export.py diff --git a/CHANGES.md b/CHANGES.md index 5c43831e..84f29ad9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,9 @@ This means relevant column definitions will not be included into the SQL DDL. - MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. - MongoDB: Sanitize lists of varying objects +- MongoDB: Add `--treatment` option for applying special treatments to certain items + on real-world data +- MongoDB: Use pagination on source collection, for creating batches towards CrateDB ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 01a3d029..4706a132 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -37,7 +37,10 @@ def __post_init__(self): logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}") def load_table( - self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None + self, + resource: InputOutputResource, + target: t.Optional[TableAddress] = None, + transformation: Path = None, ): """ Load data into a database table on CrateDB Cloud. diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index a735a1bc..bd9c3ff9 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -8,8 +8,12 @@ from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from yarl import URL +from zyp.model.collection import CollectionAddress -from cratedb_toolkit.io.mongodb.export import extract_value +from cratedb_toolkit.io.mongodb.export import CrateDBConverter +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util import DatabaseAdapter @@ -18,8 +22,13 @@ class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB): - def __init__(self, table_name: str, tm: TransformationManager = None): + """ + Translate a MongoDB document into a CrateDB document. + """ + + def __init__(self, table_name: str, converter: CrateDBConverter, tm: TransformationManager = None): super().__init__(table_name=table_name) + self.converter = converter self.tm = tm @staticmethod @@ -31,18 +40,22 @@ def get_document_key(record: t.Dict[str, t.Any]) -> str: """ return record["_id"] - def to_sql(self, document: t.Dict[str, t.Any]) -> SQLOperation: + def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperation: """ - Produce CrateDB INSERT SQL statement from MongoDB document. + Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. """ + if not isinstance(data, list): + data = [data] # Define SQL INSERT statement. sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);" - # Converge MongoDB document to SQL parameters. - record = extract_value(self.decode_bson(document)) - oid: str = self.get_document_key(record) - parameters = {"oid": oid, "record": record} + # Converge multiple MongoDB documents into SQL parameters for `executemany` operation. + parameters: t.List[DocumentDict] = [] + for document in data: + record = self.converter.convert(self.decode_bson(document)) + oid: str = self.get_document_key(record) + parameters.append({"oid": oid, "record": record}) return SQLOperation(sql, parameters) @@ -60,6 +73,7 @@ def __init__( cratedb_url: str, tm: t.Union[TransformationManager, None], mongodb_limit: int = 0, + on_error: t.Literal["ignore", "raise"] = "ignore", progress: bool = False, debug: bool = True, ): @@ -67,6 +81,7 @@ def __init__( cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() cratedb_table = cratedb_table_address.fullname + self.mongodb_uri = URL(mongodb_url) self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient( mongodb_url, document_class=RawBSONDocument, @@ -76,11 +91,20 @@ def __init__( self.mongodb_limit = mongodb_limit self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) - self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, tm=tm) + # Transformation machinery. + transformation = None + if tm: + transformation = tm.project.get(CollectionAddress(container=mongodb_database, name=mongodb_collection)) + self.converter = CrateDBConverter(transformation=transformation) + self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm) + + self.on_error = on_error self.progress = progress self.debug = debug + self.batch_size: int = int(self.mongodb_uri.query.get("batch-size", 250)) + def start(self): """ Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. @@ -90,29 +114,49 @@ def start(self): logger_on_error = logger.warning if self.debug: logger_on_error = logger.exception - with self.cratedb_adapter.engine.connect() as connection: + with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm(): if not self.cratedb_adapter.table_exists(self.cratedb_table): connection.execute(sa.text(self.translator.sql_ddl)) connection.commit() records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") progress_bar = tqdm(total=records_in) - records_out = 0 + records_out: int = 0 - for document in self.mongodb_collection.find().limit(self.mongodb_limit): + skip: int = 0 + while True: + progress_bar.set_description("ACQUIRE") + # Acquire batch of documents, and convert to SQL operation. + documents = self.mongodb_collection.find().skip(skip).limit(self.batch_size).batch_size(self.batch_size) try: - operation = self.translator.to_sql(document) - logger.debug("SQL operation: %s", operation) + operation = self.translator.to_sql(list(documents)) except Exception as ex: - logger_on_error(f"Transforming query failed: {ex}") - continue + logger_on_error(f"Computing query failed: {ex}") + if self.on_error == "raise": + raise + break + + # When input data is exhausted, stop processing. + progress_bar.set_description("CHECK") + if not operation.parameters: + break + + # Submit operation to CrateDB. + progress_bar.set_description("SUBMIT") try: result = connection.execute(sa.text(operation.statement), operation.parameters) result_size = result.rowcount + if result_size < 0: + raise ValueError("Unable to insert one or more records") records_out += result_size progress_bar.update(n=result_size) except Exception as ex: - logger_on_error(f"Executing query failed: {ex}") + logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}") + if self.on_error == "raise": + raise + + # Next page. + skip += self.batch_size progress_bar.close() connection.commit() diff --git a/cratedb_toolkit/io/mongodb/export.py b/cratedb_toolkit/io/mongodb/export.py index 2e5d711b..dbeae37c 100644 --- a/cratedb_toolkit/io/mongodb/export.py +++ b/cratedb_toolkit/io/mongodb/export.py @@ -25,8 +25,8 @@ """ import base64 -import builtins import calendar +import logging import typing as t from uuid import UUID @@ -34,11 +34,16 @@ import dateutil.parser as dateparser import orjson as json import pymongo.collection +from attr import Factory from attrs import define +from zyp.model.collection import CollectionTransformation +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.io.mongodb.util import sanitize_field_names +logger = logging.getLogger(__name__) + def date_converter(value): if isinstance(value, int): @@ -60,81 +65,64 @@ def timestamp_converter(value): } -def extract_value(value, parent_type=None): - """ - Decode MongoDB Extended JSON. - - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ - """ - if isinstance(value, dict): - if len(value) == 1: - if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: - decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) - return extract_value(decoded, parent_type) - for k, v in value.items(): - if k.startswith("$"): - return extract_value(v, k.lstrip("$")) - return {k.lstrip("$"): extract_value(v, parent_type) for (k, v) in value.items()} - if isinstance(value, list): - if value and isinstance(value[0], dict): - lovos = ListOfVaryingObjectsSanitizer(value) - lovos.apply() - - return [extract_value(v, parent_type) for v in value] - if parent_type: - converter = type_converter.get(parent_type) - if converter: - return converter(value) - return value - - @define -class ListOfVaryingObjectsSanitizer: - """ - CrateDB can not store lists of varying objects, so normalize them. - """ - - data: t.List[t.Dict[str, t.Any]] - - def apply(self): - self.apply_rules(self.get_rules(self.type_stats())) +class CrateDBConverter: + transformation: CollectionTransformation = Factory(CollectionTransformation) + + def convert(self, data: DocumentDict) -> t.Dict[str, t.Any]: + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ + return self.extract_value(data) + + def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any: + """ + Decode MongoDB Extended JSON. + + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + """ + if isinstance(value, dict): + # Custom adjustments to compensate shape anomalies in source data. + self.apply_special_treatments(value) + if len(value) == 1: + if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: + decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) + return self.extract_value(decoded, parent_type) + for k, v in value.items(): + if k.startswith("$"): + return self.extract_value(v, k.lstrip("$")) + return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()} + if isinstance(value, list): + return [self.extract_value(v, parent_type) for v in value] + if parent_type: + converter = type_converter.get(parent_type) + if converter: + return converter(value) + return value - def type_stats(self) -> t.Dict[str, t.List[str]]: - types: t.Dict[str, t.List[str]] = {} - for item in self.data: - for key, value in item.items(): - types.setdefault(key, []).append(type(value).__name__) - return types + def apply_special_treatments(self, value: t.Any): + """ + Apply special treatments to value that can't be described otherwise up until now. + # Ignore certain items including anomalies that are not resolved, yet. - def get_rules(self, all_types): - rules = [] - for name, types in all_types.items(): - if len(types) > 1: - rules.append({"name": name, "converter": self.get_best_converter(types)}) - return rules + TODO: Needs an integration test feeding two records instead of just one. + """ - def apply_rules(self, rules): - for item in self.data: - for rule in rules: - name = rule["name"] - if name in item: - item[name] = rule["converter"](item[name]) + if self.transformation is None or self.transformation.treatment is None: + return None - @staticmethod - def get_best_converter(types: t.List[str]) -> t.Callable: - if "str" in types: - return builtins.str - return lambda x: x + return self.transformation.treatment.apply(value) def convert(d): """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ + converter = CrateDBConverter() newdict = {} for k, v in sanitize_field_names(d).items(): - newdict[k] = extract_value(v) + newdict[k] = converter.convert(v) return newdict diff --git a/cratedb_toolkit/io/mongodb/model.py b/cratedb_toolkit/io/mongodb/model.py new file mode 100644 index 00000000..37e62548 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/model.py @@ -0,0 +1,3 @@ +import typing as t + +DocumentDict = t.Dict[str, t.Any] diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 2e5d0f6b..529e5f6b 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,6 +1,6 @@ import re -import typing as t +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.util.data_dict import OrderedDictX @@ -26,7 +26,7 @@ def parse_input_numbers(s: str): return options -def sanitize_field_names(data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: +def sanitize_field_names(data: DocumentDict) -> DocumentDict: """ Rename top-level column names with single leading underscores to double leading underscores. CrateDB does not accept singe leading underscores, like `_id`. diff --git a/cratedb_toolkit/util/config.py b/cratedb_toolkit/util/config.py new file mode 100644 index 00000000..0e2114dd --- /dev/null +++ b/cratedb_toolkit/util/config.py @@ -0,0 +1,55 @@ +import typing as t +from collections import OrderedDict + +import attr +from attrs import define +from cattrs.preconf.json import make_converter as make_json_converter +from cattrs.preconf.pyyaml import make_converter as make_yaml_converter + + +@define +class Metadata: + version: t.Union[int, None] = None + type: t.Union[str, None] = None + + +@define +class Dumpable: + meta: t.Union[Metadata, None] = None + + def to_dict(self) -> t.Dict[str, t.Any]: + return attr.asdict(self, dict_factory=OrderedDict, filter=no_privates_no_nulls_no_empties) + + def to_json(self) -> str: + converter = make_json_converter(dict_factory=OrderedDict) + return converter.dumps(self.to_dict()) + + def to_yaml(self) -> str: + converter = make_yaml_converter(dict_factory=OrderedDict) + return converter.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, data: t.Dict[str, t.Any]): + return cls(**data) + + @classmethod + def from_json(cls, json_str: str): + converter = make_json_converter(dict_factory=OrderedDict) + return converter.loads(json_str, cls) + + @classmethod + def from_yaml(cls, yaml_str: str): + converter = make_yaml_converter(dict_factory=OrderedDict) + return converter.loads(yaml_str, cls) + + +def no_privates_no_nulls_no_empties(key, value) -> bool: + """ + A filter for `attr.asdict`, to suppress private attributes. + """ + is_private = key.name.startswith("_") + is_null = value is None + is_empty = value == [] + if is_private or is_null or is_empty: + return False + return True diff --git a/examples/zyp/zyp-treatment-all.yaml b/examples/zyp/zyp-treatment-all.yaml new file mode 100644 index 00000000..cc353c76 --- /dev/null +++ b/examples/zyp/zyp-treatment-all.yaml @@ -0,0 +1,24 @@ +# Zyp transformation defining a few special treatments to be applied to inbound data. +--- +meta: + type: zyp-project + version: 1 +collections: +- address: + container: testdrive + name: demo + treatment: + ignore_complex_lists: false + normalize_complex_lists: true + ignore_field: + - "ignore_toplevel" + - "ignore_nested" + convert_list: + - "to_list" + convert_string: + - "to_string" + convert_dict: + - name: "to_dict_scalar" + wrapper_name: "id" + - name: "user" + wrapper_name: "id" diff --git a/examples/zyp/zyp-treatment-ignore.yaml b/examples/zyp/zyp-treatment-ignore.yaml new file mode 100644 index 00000000..6f0b6364 --- /dev/null +++ b/examples/zyp/zyp-treatment-ignore.yaml @@ -0,0 +1,11 @@ +# Zyp transformation defining a few special treatments to be applied to inbound data. +--- +meta: + type: zyp-project + version: 1 +collections: +- address: + container: testdrive + name: demo + treatment: + ignore_complex_lists: true diff --git a/pyproject.toml b/pyproject.toml index cf6b515c..a57abb83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,7 +83,9 @@ dynamic = [ "version", ] dependencies = [ + "attrs<25", "boltons<25", + "cattrs<24", "click<9", "click-aliases<2,>=1.0.4", "colorama<1", @@ -159,7 +161,7 @@ kinesis = [ "lorrystream[carabas]>=0.0.6", ] mongodb = [ - "commons-codec[mongodb,zyp]>=0.0.14", + "commons-codec[mongodb,zyp]>=0.0.15", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1", diff --git a/tests/io/mongodb/conftest.py b/tests/io/mongodb/conftest.py index 2361000e..419b98e6 100644 --- a/tests/io/mongodb/conftest.py +++ b/tests/io/mongodb/conftest.py @@ -7,6 +7,12 @@ logger = logging.getLogger(__name__) +pytest.importorskip("bson", reason="Skipping tests because bson is not installed") +pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed") +pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") +pytest.importorskip("rich", reason="Skipping tests because rich is not installed") + + # Define databases to be deleted before running each test case. RESET_DATABASES = [ "testdrive", diff --git a/tests/io/mongodb/test_cli.py b/tests/io/mongodb/test_cli.py index a97b7243..9cdda9e5 100644 --- a/tests/io/mongodb/test_cli.py +++ b/tests/io/mongodb/test_cli.py @@ -1,22 +1,23 @@ import os +from copy import deepcopy +from pathlib import Path from unittest import mock from uuid import UUID +import bson import dateutil +import pymongo import pytest import sqlparse from click.testing import CliRunner from pueblo.testing.dataframe import DataFrameFactory +from toolz import dissoc from cratedb_toolkit.cli import cli from tests.conftest import check_sqlalchemy2 pytestmark = pytest.mark.mongodb -bson = pytest.importorskip("bson", reason="Skipping tests because bson is not installed") -pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") -pytest.importorskip("rich", reason="Skipping tests because rich is not installed") - @pytest.fixture(scope="module", autouse=True) def check_prerequisites(): @@ -76,6 +77,14 @@ def test_version(): }, }, } +DOCUMENT_OUT_NO_COMPLEX_LISTS = deepcopy(DOCUMENT_OUT) +DOCUMENT_OUT_NO_COMPLEX_LISTS["data"]["value"] = dissoc( + DOCUMENT_OUT["data"]["value"], + "list_object_symmetric", + "list_object_varying_string", + "list_object_varying_date", +) + DOCUMENT_DDL = """ CREATE TABLE IF NOT EXISTS "testdrive"."demo" ( "oid" TEXT, @@ -114,12 +123,12 @@ def test_mongodb_load_table_basic(caplog, cratedb, mongodb): assert cratedb.database.count_records("testdrive.demo") == 42 -def test_mongodb_load_table_real(caplog, cratedb, mongodb): +def test_mongodb_load_table_complex_lists_normalize(caplog, cratedb, mongodb): """ CLI test: Invoke `ctk load table` for MongoDB. """ - cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database. client: pymongo.MongoClient = mongodb.get_connection_client() @@ -127,11 +136,13 @@ def test_mongodb_load_table_real(caplog, cratedb, mongodb): demo = testdrive.create_collection("demo") demo.insert_many([DOCUMENT_IN]) + transformation = Path("examples/zyp/zyp-treatment-all.yaml") + # Run transfer command. runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) result = runner.invoke( cli, - args=f"load table {mongodb_url}", + args=f"load table {mongodb_url} --transformation={transformation}", catch_exceptions=False, ) assert result.exit_code == 0 @@ -150,3 +161,43 @@ def test_mongodb_load_table_real(caplog, cratedb, mongodb): sql = results[0][0] sql = sqlparse.format(sql) assert sql.startswith(DOCUMENT_DDL) + + +def test_mongodb_load_table_complex_lists_ignore(caplog, cratedb, mongodb): + """ + CLI test: Invoke `ctk load table` for MongoDB, with special parameter to ignore complex lists. + """ + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_many([DOCUMENT_IN]) + + transformation = Path("examples/zyp/zyp-treatment-ignore.yaml") + + # Run transfer command. + runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url}) + result = runner.invoke( + cli, + args=f"load table {mongodb_url} --transformation={transformation}", + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 1 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo", records=True) + assert results[0] == DOCUMENT_OUT_NO_COMPLEX_LISTS + + # Verify schema in target database. + results = cratedb.database.run_sql("SHOW CREATE TABLE testdrive.demo") + sql = results[0][0] + sql = sqlparse.format(sql) + assert sql.startswith(DOCUMENT_DDL) diff --git a/tests/io/mongodb/test_export.py b/tests/io/mongodb/test_export.py new file mode 100644 index 00000000..46666a08 --- /dev/null +++ b/tests/io/mongodb/test_export.py @@ -0,0 +1,107 @@ +import pytest +from zyp.model.collection import CollectionTransformation +from zyp.model.treatment import Treatment + +from cratedb_toolkit.io.mongodb.export import CrateDBConverter + +pytestmark = pytest.mark.mongodb + + +def test_convert_basic(): + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, + } + + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + "list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": 1443090762000}, + ], + }, + } + converter = CrateDBConverter() + assert converter.convert(data_in) == data_out + + +def test_convert_with_treatment_ignore_complex_list(): + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "some_complex_list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, + } + + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + }, + } + + treatment = Treatment(ignore_complex_lists=True) + converter = CrateDBConverter(transformation=CollectionTransformation(treatment=treatment)) + assert converter.convert(data_in) == data_out + + +def test_convert_with_treatment_all_options(): + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "ignore_toplevel": 42, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "ignore_nested": 42, + }, + "to_list": 42, + "to_string": 42, + "to_dict_scalar": 42, + "to_dict_list": [{"user": 42}], + } + + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + }, + "to_list": [42], + "to_string": "42", + "to_dict_scalar": {"id": 42}, + "to_dict_list": [{"user": {"id": 42}}], + } + treatment = Treatment( + ignore_complex_lists=False, + ignore_field=["ignore_toplevel", "ignore_nested"], + convert_list=["to_list"], + convert_string=["to_string"], + convert_dict=[ + {"name": "to_dict_scalar", "wrapper_name": "id"}, + {"name": "user", "wrapper_name": "id"}, + ], + ) + converter = CrateDBConverter(transformation=CollectionTransformation(treatment=treatment)) + assert converter.convert(data_in) == data_out diff --git a/tests/io/mongodb/test_extract.py b/tests/io/mongodb/test_extract.py index c5cf5a85..63fe1b04 100644 --- a/tests/io/mongodb/test_extract.py +++ b/tests/io/mongodb/test_extract.py @@ -3,18 +3,13 @@ import unittest from collections import OrderedDict -import pytest - -pytestmark = pytest.mark.mongodb - -pytest.importorskip("bson", reason="Skipping tests because bson is not installed") -pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") -pytest.importorskip("rich", reason="Skipping tests because rich is not installed") - import bson +import pytest from cratedb_toolkit.io.mongodb import extract +pytestmark = pytest.mark.mongodb + class TestExtractTypes(unittest.TestCase): def test_primitive_types(self): diff --git a/tests/io/mongodb/test_integration.py b/tests/io/mongodb/test_integration.py index 7574e7d3..57526efc 100644 --- a/tests/io/mongodb/test_integration.py +++ b/tests/io/mongodb/test_integration.py @@ -4,15 +4,13 @@ import unittest from unittest import mock +import pymongo import pytest from tests.conftest import check_sqlalchemy2 pytestmark = pytest.mark.mongodb -pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") -pytest.importorskip("rich", reason="Skipping tests because rich is not installed") - @pytest.fixture(scope="module", autouse=True) def check_prerequisites(): diff --git a/tests/io/mongodb/test_transformation.py b/tests/io/mongodb/test_transformation.py index cabf0c29..ae063f15 100644 --- a/tests/io/mongodb/test_transformation.py +++ b/tests/io/mongodb/test_transformation.py @@ -1,17 +1,14 @@ from pathlib import Path +from unittest import mock +import pymongo import pytest +from cratedb_toolkit.io.mongodb.api import mongodb_copy from tests.conftest import check_sqlalchemy2 pytestmark = pytest.mark.mongodb -pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") -pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed") -pytest.importorskip("rich", reason="Skipping tests because rich is not installed") - -from cratedb_toolkit.io.mongodb.api import mongodb_copy # noqa: E402 - @pytest.fixture(scope="module", autouse=True) def check_prerequisites(): @@ -21,7 +18,6 @@ def check_prerequisites(): check_sqlalchemy2() -@pytest.mark.skip("Wishful thinking with single column strategy") def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb): """ Verify MongoDB -> CrateDB data transfer with transformation. @@ -52,4 +48,74 @@ def test_mongodb_copy_transform_timestamp(caplog, cratedb, mongodb): "SELECT pg_typeof(data['timestamp']) AS type FROM testdrive.demo;", records=True ) timestamp_type = type_result[0]["type"] - assert timestamp_type == "TIMESTAMP WITH TIME ZONE" + assert timestamp_type == "bigint" + + # FIXME: Only works with a defined schema. + # assert timestamp_type == "TIMESTAMP WITH TIME ZONE" # noqa: ERA001 + + +def test_mongodb_copy_treatment_all(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer with Zyp Treatment transformation. + """ + + data_in = { + "ignore_toplevel": 42, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "ignore_nested": 42, + }, + "to_list": 42, + "to_string": 42, + "to_dict_scalar": 42, + "to_dict_list": [{"user": 42}], + } + + data_out = { + "oid": mock.ANY, + "data": { + "_id": mock.ANY, + "value": { + "date": 1443004362000, + "id": 42, + }, + "to_list": [42], + "to_string": "42", + "to_dict_scalar": {"id": 42}, + "to_dict_list": [{"user": {"id": 42}}], + }, + } + + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo" + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_one(data_in) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + transformation=Path("examples/zyp/zyp-treatment-all.yaml"), + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo") + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) + assert results[0] == data_out + return + assert results[0]["data"]["timestamp"] == 1563051934000 + + # Verify schema in target database. + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['timestamp']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + assert timestamp_type == "bigint" + + # FIXME: Only works with a defined schema. + # assert timestamp_type == "TIMESTAMP WITH TIME ZONE" # noqa: ERA001 diff --git a/tests/io/mongodb/test_util.py b/tests/io/mongodb/test_util.py index 0146d0cf..dc27cdb1 100644 --- a/tests/io/mongodb/test_util.py +++ b/tests/io/mongodb/test_util.py @@ -2,7 +2,7 @@ import pytest -from cratedb_toolkit.io.mongodb.util import parse_input_numbers, sanitize_field_names +from cratedb_toolkit.io.mongodb.util import parse_input_numbers, sanitize_field_names # noqa: E402 pytestmark = pytest.mark.mongodb From cf8213d8e449818ddd93ae232acc2ade649c4c6c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 9 Sep 2024 12:43:58 +0200 Subject: [PATCH 12/12] MongoDB: Unlock importing MongoDB Extended JSON using `file+bson://...` --- CHANGES.md | 1 + cratedb_toolkit/api/main.py | 68 ++++++---- cratedb_toolkit/io/mongodb/adapter.py | 134 +++++++++++++++++++ cratedb_toolkit/io/mongodb/api.py | 21 +-- cratedb_toolkit/io/mongodb/copy.py | 62 ++++----- cratedb_toolkit/io/mongodb/model.py | 1 + cratedb_toolkit/io/mongodb/util.py | 22 +++- cratedb_toolkit/model.py | 3 +- cratedb_toolkit/util/database.py | 6 + doc/io/mongodb/loader.md | 160 +++++++++++++++++++---- examples/zyp/zyp-mongodb-json-files.yaml | 26 ++++ pyproject.toml | 1 + tests/io/mongodb/books-canonical.ndjson | 4 + tests/io/mongodb/books-relaxed.ndjson | 4 + tests/io/mongodb/books.bson.gz | Bin 0 -> 2452 bytes tests/io/mongodb/conftest.py | 1 + tests/io/mongodb/test_copy.py | 115 ++++++++++++++++ 17 files changed, 526 insertions(+), 103 deletions(-) create mode 100644 cratedb_toolkit/io/mongodb/adapter.py create mode 100644 examples/zyp/zyp-mongodb-json-files.yaml create mode 100644 tests/io/mongodb/books-canonical.ndjson create mode 100644 tests/io/mongodb/books-relaxed.ndjson create mode 100644 tests/io/mongodb/books.bson.gz create mode 100644 tests/io/mongodb/test_copy.py diff --git a/CHANGES.md b/CHANGES.md index 84f29ad9..aa1cd01c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ - MongoDB: Add `--treatment` option for applying special treatments to certain items on real-world data - MongoDB: Use pagination on source collection, for creating batches towards CrateDB +- MongoDB: Unlock importing MongoDB Extended JSON files using `file+bson://...` ## 2024/09/02 v0.0.21 - DynamoDB: Add special decoding for varied lists. diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 4706a132..bb416cb6 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -6,7 +6,7 @@ from abc import abstractmethod from pathlib import Path -from yarl import URL +from boltons.urlutils import URL from cratedb_toolkit.api.guide import GuidingTexts from cratedb_toolkit.cluster.util import get_cluster_info @@ -113,46 +113,66 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf ctk load table influxdb2://example:token@localhost:8086/testdrive/demo ctk load table mongodb://localhost:27017/testdrive/demo """ - source_url = resource.url + source_url_obj = URL(resource.url) target_url = self.address.dburi - source_url_obj = URL(source_url) - if source_url.startswith("dynamodb"): + + if source_url_obj.scheme.startswith("dynamodb"): from cratedb_toolkit.io.dynamodb.api import dynamodb_copy - if not dynamodb_copy(source_url, target_url, progress=True): + if not dynamodb_copy(str(source_url_obj), target_url, progress=True): msg = "Data loading failed" logger.error(msg) raise OperationFailed(msg) - elif source_url.startswith("influxdb"): + elif source_url_obj.scheme.startswith("file"): + if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: + mongodb_copy_generic( + str(source_url_obj), + target_url, + transformation=transformation, + progress=True, + ) + + elif source_url_obj.scheme.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy - http_scheme = "http://" - if asbool(source_url_obj.query.get("ssl")): - http_scheme = "https://" - source_url = source_url.replace("influxdb2://", http_scheme) - if not influxdb_copy(source_url, target_url, progress=True): + http_scheme = "http" + if asbool(source_url_obj.query_params.get("ssl")): + http_scheme = "https" + source_url_obj.scheme = source_url_obj.scheme.replace("influxdb2", http_scheme) + if not influxdb_copy(str(source_url_obj), target_url, progress=True): msg = "Data loading failed" logger.error(msg) raise OperationFailed(msg) - elif source_url.startswith("mongodb"): - if "+cdc" in source_url: - source_url = source_url.replace("+cdc", "") + + elif source_url_obj.scheme.startswith("mongodb"): + if "+cdc" in source_url_obj.scheme: + source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc - mongodb_relay_cdc(source_url, target_url, progress=True) + mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) else: - from cratedb_toolkit.io.mongodb.api import mongodb_copy - - if not mongodb_copy( - source_url, + mongodb_copy_generic( + str(source_url_obj), target_url, transformation=transformation, - limit=int(source_url_obj.query.get("limit", 0)), progress=True, - ): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + ) else: raise NotImplementedError("Importing resource not implemented yet") + + +def mongodb_copy_generic( + source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False +): + from cratedb_toolkit.io.mongodb.api import mongodb_copy + + if not mongodb_copy( + source_url, + target_url, + transformation=transformation, + progress=progress, + ): + msg = "Data loading failed" + logger.error(msg) + raise OperationFailed(msg) diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py new file mode 100644 index 00000000..e539e778 --- /dev/null +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -0,0 +1,134 @@ +import itertools +import logging +import typing as t +from abc import abstractmethod +from functools import cached_property +from pathlib import Path + +import boltons.urlutils +import polars as pl +import pymongo +import yarl +from attrs import define, field +from boltons.urlutils import URL +from bson.raw_bson import RawBSONDocument +from undatum.common.iterable import IterableData + +from cratedb_toolkit.io.mongodb.util import batches +from cratedb_toolkit.model import DatabaseAddress + +logger = logging.getLogger(__name__) + + +@define +class MongoDBAdapterBase: + address: DatabaseAddress + effective_url: URL + database_name: str + collection_name: str + + _custom_query_parameters = ["batch-size", "limit", "offset"] + + @classmethod + def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): + if not isinstance(url, str): + url = str(url) + mongodb_address = DatabaseAddress.from_string(url) + mongodb_uri, mongodb_collection_address = mongodb_address.decode() + logger.info(f"Collection address: {mongodb_collection_address}") + mongodb_database = mongodb_collection_address.schema + mongodb_collection = mongodb_collection_address.table + for custom_query_parameter in cls._custom_query_parameters: + mongodb_uri.query_params.pop(custom_query_parameter, None) + return cls( + address=mongodb_address, + effective_url=mongodb_uri, + database_name=mongodb_database, + collection_name=mongodb_collection, + ) + + def __attrs_post_init__(self): + self.setup() + + @cached_property + def batch_size(self) -> int: + return int(self.address.uri.query_params.get("batch-size", 500)) + + @cached_property + def limit(self) -> int: + return int(self.address.uri.query_params.get("limit", 0)) + + @cached_property + def offset(self) -> int: + return int(self.address.uri.query_params.get("offset", 0)) + + @abstractmethod + def setup(self): + raise NotImplementedError() + + @abstractmethod + def record_count(self, filter_=None) -> int: + raise NotImplementedError() + + @abstractmethod + def query(self): + raise NotImplementedError() + + +@define +class MongoDBFileAdapter(MongoDBAdapterBase): + _path: Path = field(init=False) + + def setup(self): + self._path = Path(self.address.uri.path) + + def record_count(self, filter_=None) -> int: + """ + https://stackoverflow.com/a/27517681 + """ + f = open(self._path, "rb") + bufgen = itertools.takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in itertools.repeat(None))) + return sum(buf.count(b"\n") for buf in bufgen if buf) + + def query(self): + if not self._path.exists(): + raise FileNotFoundError(f"Resource not found: {self._path}") + if self.offset: + raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + if self._path.suffix in [".ndjson", ".jsonl"]: + data = pl.read_ndjson(self._path, batch_size=self.batch_size, n_rows=self.limit or None).to_dicts() + elif ".bson" in str(self._path): + data = IterableData(str(self._path), options={"format_in": "bson"}).iter() + else: + raise ValueError(f"Unsupported file type: {self._path.suffix}") + return batches(data, self.batch_size) + + +@define +class MongoDBServerAdapter(MongoDBAdapterBase): + _mongodb_client: pymongo.MongoClient = field(init=False) + _mongodb_collection: pymongo.collection.Collection = field(init=False) + + def setup(self): + self._mongodb_client: pymongo.MongoClient = pymongo.MongoClient( + str(self.effective_url), + document_class=RawBSONDocument, + datetime_conversion="DATETIME_AUTO", + ) + self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name] + + def record_count(self, filter_=None) -> int: + filter_ = filter_ or {} + return self._mongodb_collection.count_documents(filter=filter_) + + def query(self): + data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit) + return batches(data, self.batch_size) + + +def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase: + if mongodb_uri.scheme.startswith("file"): + return MongoDBFileAdapter.from_url(mongodb_uri) + elif mongodb_uri.scheme.startswith("mongodb"): + return MongoDBServerAdapter.from_url(mongodb_uri) + raise ValueError("Unable to create MongoDB adapter") diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index e95890c9..35acb47b 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -1,5 +1,6 @@ import argparse import logging +import typing as t from pathlib import Path from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB @@ -41,12 +42,11 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi # 1. Extract schema from MongoDB collection. logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}") extract_args = argparse.Namespace( - url=str(mongodb_uri), + url=str(mongodb_uri) + f"&limit={limit}", database=mongodb_database, collection=mongodb_collection, scan="partial", transformation=transformation, - limit=limit, ) mongodb_schema = extract(extract_args) count = mongodb_schema[mongodb_collection]["count"] @@ -75,11 +75,10 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}" ) export_args = argparse.Namespace( - url=str(mongodb_uri), + url=str(mongodb_uri) + f"&limit={limit}", database=mongodb_database, collection=mongodb_collection, transformation=transformation, - limit=limit, ) buffer = export(export_args) cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) @@ -87,7 +86,7 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi return True -def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False): +def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = None, progress: bool = False): """ Transfer MongoDB collection using translator component. @@ -97,13 +96,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int ctk load table mongodb://localhost:27017/testdrive/demo """ - # Decode database URL. - mongodb_address = DatabaseAddress.from_string(source_url) - mongodb_uri, mongodb_collection_address = mongodb_address.decode() - mongodb_database = mongodb_collection_address.schema - mongodb_collection = mongodb_collection_address.table - - logger.info(f"Invoking MongoDBFullLoad. mongodb_uri={mongodb_uri}") + logger.info(f"Invoking MongoDBFullLoad. source_url={source_url}") # Optionally configure transformations. tm = None @@ -112,9 +105,7 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int # Invoke `full-load` procedure. mdb_full = MongoDBFullLoad( - mongodb_url=str(mongodb_uri), - mongodb_database=mongodb_database, - mongodb_collection=mongodb_collection, + mongodb_url=source_url, cratedb_url=target_url, tm=tm, progress=progress, diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index bd9c3ff9..f0980af1 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -2,16 +2,16 @@ import logging import typing as t -import pymongo import sqlalchemy as sa -from bson.raw_bson import RawBSONDocument +from boltons.urlutils import URL from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB +from pymongo.cursor import Cursor from tqdm import tqdm from tqdm.contrib.logging import logging_redirect_tqdm -from yarl import URL from zyp.model.collection import CollectionAddress +from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.export import CrateDBConverter from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager @@ -44,6 +44,8 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat """ Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. """ + if isinstance(data, Cursor): + data = list(data) if not isinstance(data, list): data = [data] @@ -68,34 +70,34 @@ class MongoDBFullLoad: def __init__( self, mongodb_url: str, - mongodb_database: str, - mongodb_collection: str, cratedb_url: str, tm: t.Union[TransformationManager, None], - mongodb_limit: int = 0, - on_error: t.Literal["ignore", "raise"] = "ignore", + on_error: t.Literal["ignore", "raise"] = "raise", progress: bool = False, debug: bool = True, ): + # Decode database URL: MongoDB. + self.mongodb_uri = URL(mongodb_url) + self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri) + + # Decode database URL: CrateDB. cratedb_address = DatabaseAddress.from_string(cratedb_url) cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() cratedb_table = cratedb_table_address.fullname - self.mongodb_uri = URL(mongodb_url) - self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient( - mongodb_url, - document_class=RawBSONDocument, - datetime_conversion="DATETIME_AUTO", - ) - self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] - self.mongodb_limit = mongodb_limit self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) # Transformation machinery. transformation = None if tm: - transformation = tm.project.get(CollectionAddress(container=mongodb_database, name=mongodb_collection)) + address = CollectionAddress( + container=self.mongodb_adapter.database_name, name=self.mongodb_adapter.collection_name + ) + try: + transformation = tm.project.get(address=address) + except KeyError: + pass self.converter = CrateDBConverter(transformation=transformation) self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm) @@ -103,14 +105,12 @@ def __init__( self.progress = progress self.debug = debug - self.batch_size: int = int(self.mongodb_uri.query.get("batch-size", 250)) - def start(self): """ Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. """ - records_in = self.mongodb_collection.count_documents(filter={}) - logger.info(f"Source: MongoDB collection={self.mongodb_collection} count={records_in}") + records_in = self.mongodb_adapter.record_count() + logger.info(f"Source: MongoDB collection={self.mongodb_adapter.collection_name} count={records_in}") logger_on_error = logger.warning if self.debug: logger_on_error = logger.exception @@ -123,26 +123,20 @@ def start(self): progress_bar = tqdm(total=records_in) records_out: int = 0 - skip: int = 0 - while True: + # Acquire batches of documents, convert to SQL operations, and submit to CrateDB. + for documents in self.mongodb_adapter.query(): progress_bar.set_description("ACQUIRE") - # Acquire batch of documents, and convert to SQL operation. - documents = self.mongodb_collection.find().skip(skip).limit(self.batch_size).batch_size(self.batch_size) + try: - operation = self.translator.to_sql(list(documents)) + operation = self.translator.to_sql(documents) except Exception as ex: logger_on_error(f"Computing query failed: {ex}") if self.on_error == "raise": raise - break - - # When input data is exhausted, stop processing. - progress_bar.set_description("CHECK") - if not operation.parameters: - break + continue # Submit operation to CrateDB. - progress_bar.set_description("SUBMIT") + progress_bar.set_description("SUBMIT ") try: result = connection.execute(sa.text(operation.statement), operation.parameters) result_size = result.rowcount @@ -154,9 +148,7 @@ def start(self): logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}") if self.on_error == "raise": raise - - # Next page. - skip += self.batch_size + continue progress_bar.close() connection.commit() diff --git a/cratedb_toolkit/io/mongodb/model.py b/cratedb_toolkit/io/mongodb/model.py index 37e62548..559b0ad0 100644 --- a/cratedb_toolkit/io/mongodb/model.py +++ b/cratedb_toolkit/io/mongodb/model.py @@ -1,3 +1,4 @@ import typing as t DocumentDict = t.Dict[str, t.Any] +Documents = t.List[DocumentDict] diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 529e5f6b..0aed69a8 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,6 +1,9 @@ import re +import typing as t -from cratedb_toolkit.io.mongodb.model import DocumentDict +from pymongo.cursor import Cursor + +from cratedb_toolkit.io.mongodb.model import DocumentDict, Documents from cratedb_toolkit.util.data_dict import OrderedDictX @@ -39,3 +42,20 @@ def sanitize_field_names(data: DocumentDict) -> DocumentDict: if name.startswith("_") and not name.startswith("__"): d.rename_key(name, f"_{name}") return d + + +def batches(data: t.Union[Cursor, Documents], batch_size: int = 100) -> t.Generator[Documents, None, None]: + """ + Generate batches of documents. + """ + count = 0 + buffer = [] + for item in data: + buffer.append(item) + count += 1 + if count >= batch_size: + yield buffer + buffer = [] + count = 0 + if buffer: + yield buffer diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index c2b3716a..66f443d7 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -70,7 +70,8 @@ def decode(self) -> t.Tuple[URL, "TableAddress"]: database, table = decode_database_table(self.dburi) uri = deepcopy(self.uri) - uri.path = "" + if not uri.scheme.startswith("file"): + uri.path = "" return uri, TableAddress(database, table) diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index fe97b352..ef6af8ee 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -391,6 +391,12 @@ def decode_database_table(url: str) -> t.Tuple[str, str]: table = url_.query_params.get("table") if url_.scheme == "crate" and not database: database = url_.query_params.get("schema") + if database is None and table is None: + if url_.scheme.startswith("file"): + _, database, table = url_.path.rsplit("/", 2) + table, _ = table.split(".", 1) + if database is None and table is None: + raise ValueError("Database and table must be specified") from ex return database, table diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index 7242687a..40e736ee 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -2,18 +2,109 @@ # MongoDB Table Loader ## About -Load data from MongoDB into CrateDB using a one-stop command -`ctk load table mongodb://...`, in order to facilitate convenient -data transfers to be used within data pipelines or ad hoc operations. +Load data from MongoDB and its file formats into CrateDB using a one-stop +command `ctk load table`, in order to facilitate convenient data transfers +to be used within data pipelines or ad hoc operations. + +## Coverage +CrateDB Toolkit supports different variants to load MongoDB data from +server instances and filesystems. + +- `mongodb://` + + Connect to MongoDB Community or Enterprise Edition. + +- `mongodb+srv://` + + Connect to MongoDB Atlas. + +- `file+bson://` + + Read [MongoDB Extended JSON] format from filesystem. ## Install ```shell pip install --upgrade 'cratedb-toolkit[mongodb]' ``` -## Example -Import two data points into MongoDB. +## Usage +The MongoDB I/O adapter can process MongoDB data from different sources. +This section enumerates relevant connectivity options on behalf of +concrete usage examples. +### MongoDB Community and Enterprise +Transfer data from MongoDB database/collection into CrateDB schema/table. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk load table "mongodb://localhost:27017/testdrive/demo" +``` +Query data in CrateDB. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk shell --command "SELECT * FROM testdrive.demo;" +ctk show table "testdrive.demo" +``` + +### MongoDB Atlas +Transfer data from MongoDB Atlas. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker/stocks?batch-size=5000" +``` + +### MongoDB JSON/BSON files +Load data from MongoDB JSON/BSON files, for example produced by the +`mongoexport` or `mongodump` programs. +In order to get hold of a few samples worth of data, the canonical MongoDB C +driver's [libbson test files] includes a few. In this case, let's acquire +the collection at [mongodb-json-files]. +```shell +git clone https://github.com/ozlerhakan/mongodb-json-files.git +``` +```shell +CRATEDB_SQLALCHEMY_BASEURL=crate://crate@localhost:4200/testdrive +ctk load table \ + "file+bson:///path/to/mongodb-json-files/datasets/books.json" \ + --cratedb-sqlalchemy-url="${CRATEDB_SQLALCHEMY_BASEURL}/books" +``` +Address relative and/or compressed BSON files like +`file+bson:./tmp/testdrive/books.bson.gz`. + +Example queries that fit the schema of `books.json`, and more, can be +found at [](#ready-made-queries). + + +## Options + +### Batch Size +The default batch size is 500. You can adjust the value by appending the HTTP +URL query parameter `batch-size` to the source URL, like +`mongodb+srv://managed.mongodb.net/ticker/stocks?batch-size=5000`. + +### Offset +Use the HTTP URL query parameter `offset` on the source URL, like +`&offset=42`, in order to start processing at this record from the +beginning. + +### Limit +Use the HTTP URL query parameter `limit` on the source URL, like +`&limit=100`, in order to limit processing to a total number of +records. + +## Zyp Transformations +You can use [Zyp Transformations] to change the shape of the data while being +transferred. In order to add it to the pipeline, use the `--transformation` +command line option. + +It is also available for the `migr8 extract` and `migr8 export` commands. +Example transformation files in YAML format can be explored at [examples/zyp]. + + +## Appendix + +### Insert Exercise +Import two data points into MongoDB database `testdrive` and collection `demo`, +using the `mongosh` CLI program. ```shell mongosh mongodb://localhost:27017/testdrive <=3.10.1", "python-bsonjs<0.5", "rich<14,>=3.3.2", + "undatum<1.1", ] pymongo = [ "jessiql==1.0.0rc1", diff --git a/tests/io/mongodb/books-canonical.ndjson b/tests/io/mongodb/books-canonical.ndjson new file mode 100644 index 00000000..322d4cfb --- /dev/null +++ b/tests/io/mongodb/books-canonical.ndjson @@ -0,0 +1,4 @@ +{"_id":{"$numberInt":"1"},"title":"Unlocking Android","isbn":"1933988673","pageCount":{"$numberInt":"416"},"publishedDate":{"$date":{"$numberLong":"1238569200000"}},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ableson.jpg","shortDescription":"Unlocking Android: A Developer's Guide provides concise, hands-on instruction for the Android operating system and development tools. This book teaches important architectural concepts in a straightforward writing style and builds on this with practical and useful examples throughout.","longDescription":"Android is an open source mobile phone platform based on the Linux operating system and developed by the Open Handset Alliance, a consortium of over 30 hardware, software and telecom companies that focus on open standards for mobile devices. Led by search giant, Google, Android is designed to deliver a better and more open and cost effective mobile experience. Unlocking Android: A Developer's Guide provides concise, hands-on instruction for the Android operating system and development tools. This book teaches important architectural concepts in a straightforward writing style and builds on this with practical and useful examples throughout. Based on his mobile development experience and his deep knowledge of the arcane Android technical documentation, the author conveys the know-how you need to develop practical applications that build upon or replace any of Androids features, however small. Unlocking Android: A Developer's Guide prepares the reader to embrace the platform in easy-to-understand language and builds on this foundation with re-usable Java code examples. It is ideal for corporate and hobbyists alike who have an interest, or a mandate, to deliver software functionality for cell phones. WHAT'S INSIDE: * Android's place in the market * Using the Eclipse environment for Android development * The Intents - how and why they are used * Application classes: o Activity o Service o IntentReceiver * User interface design * Using the ContentProvider to manage data * Persisting data with the SQLite database * Networking examples * Telephony applications * Notification methods * OpenGL, animation & multimedia * Sample Applications ","status":"PUBLISH","authors":["W. Frank Ableson","Charlie Collins","Robi Sen"],"categories":["Open Source","Mobile"]} +{"_id":{"$numberInt":"2"},"title":"Android in Action, Second Edition","isbn":"1935182722","pageCount":{"$numberInt":"592"},"publishedDate":{"$date":{"$numberLong":"1294992000000"}},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ableson2.jpg","shortDescription":"Android in Action, Second Edition is a comprehensive tutorial for Android developers. Taking you far beyond \"Hello Android,\" this fast-paced book puts you in the driver's seat as you learn important architectural concepts and implementation strategies. You'll master the SDK, build WebKit apps using HTML 5, and even learn to extend or replace Android's built-in features by building useful and intriguing examples. ","longDescription":"When it comes to mobile apps, Android can do almost anything and with this book, so can you! Android runs on mobile devices ranging from smart phones to tablets to countless special-purpose gadgets. It's the broadest mobile platform available. Android in Action, Second Edition is a comprehensive tutorial for Android developers. Taking you far beyond \"Hello Android,\" this fast-paced book puts you in the driver's seat as you learn important architectural concepts and implementation strategies. You'll master the SDK, build WebKit apps using HTML 5, and even learn to extend or replace Android's built-in features by building useful and intriguing examples. ","status":"PUBLISH","authors":["W. Frank Ableson","Robi Sen"],"categories":["Java"]} +{"_id":{"$numberInt":"3"},"title":"Specification by Example","isbn":"1617290084","pageCount":{"$numberInt":"0"},"publishedDate":{"$date":{"$numberLong":"1307084400000"}},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/adzic.jpg","status":"PUBLISH","authors":["Gojko Adzic"],"categories":["Software Engineering"]} +{"_id":{"$numberInt":"4"},"title":"Flex 3 in Action","isbn":"1933988746","pageCount":{"$numberInt":"576"},"publishedDate":{"$date":{"$numberLong":"1233561600000"}},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ahmed.jpg","longDescription":"New web applications require engaging user-friendly interfaces and the cooler, the better. With Flex 3, web developers at any skill level can create high-quality, effective, and interactive Rich Internet Applications (RIAs) quickly and easily. Flex removes the complexity barrier from RIA development by offering sophisticated tools and a straightforward programming language so you can focus on what you want to do instead of how to do it. And now that the major components of Flex are free and open-source, the cost barrier is gone, as well! Flex 3 in Action is an easy-to-follow, hands-on Flex tutorial. Chock-full of examples, this book goes beyond feature coverage and helps you put Flex to work in real day-to-day tasks. You'll quickly master the Flex API and learn to apply the techniques that make your Flex applications stand out from the crowd. Interesting themes, styles, and skins It's in there. Working with databases You got it. Interactive forms and validation You bet. Charting techniques to help you visualize data Bam! The expert authors of Flex 3 in Action have one goal to help you get down to business with Flex 3. Fast. Many Flex books are overwhelming to new users focusing on the complexities of the language and the super-specialized subjects in the Flex eco-system; Flex 3 in Action filters out the noise and dives into the core topics you need every day. Using numerous easy-to-understand examples, Flex 3 in Action gives you a strong foundation that you can build on as the complexity of your projects increases.","status":"PUBLISH","authors":["Tariq Ahmed with Jon Hirschi","Faisal Abid"],"categories":["Internet"]} diff --git a/tests/io/mongodb/books-relaxed.ndjson b/tests/io/mongodb/books-relaxed.ndjson new file mode 100644 index 00000000..45db1f27 --- /dev/null +++ b/tests/io/mongodb/books-relaxed.ndjson @@ -0,0 +1,4 @@ +{"_id":1,"title":"Unlocking Android","isbn":"1933988673","pageCount":416,"publishedDate":{"$date":"2009-04-01T07:00:00Z"},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ableson.jpg","shortDescription":"Unlocking Android: A Developer's Guide provides concise, hands-on instruction for the Android operating system and development tools. This book teaches important architectural concepts in a straightforward writing style and builds on this with practical and useful examples throughout.","longDescription":"Android is an open source mobile phone platform based on the Linux operating system and developed by the Open Handset Alliance, a consortium of over 30 hardware, software and telecom companies that focus on open standards for mobile devices. Led by search giant, Google, Android is designed to deliver a better and more open and cost effective mobile experience. Unlocking Android: A Developer's Guide provides concise, hands-on instruction for the Android operating system and development tools. This book teaches important architectural concepts in a straightforward writing style and builds on this with practical and useful examples throughout. Based on his mobile development experience and his deep knowledge of the arcane Android technical documentation, the author conveys the know-how you need to develop practical applications that build upon or replace any of Androids features, however small. Unlocking Android: A Developer's Guide prepares the reader to embrace the platform in easy-to-understand language and builds on this foundation with re-usable Java code examples. It is ideal for corporate and hobbyists alike who have an interest, or a mandate, to deliver software functionality for cell phones. WHAT'S INSIDE: * Android's place in the market * Using the Eclipse environment for Android development * The Intents - how and why they are used * Application classes: o Activity o Service o IntentReceiver * User interface design * Using the ContentProvider to manage data * Persisting data with the SQLite database * Networking examples * Telephony applications * Notification methods * OpenGL, animation & multimedia * Sample Applications ","status":"PUBLISH","authors":["W. Frank Ableson","Charlie Collins","Robi Sen"],"categories":["Open Source","Mobile"]} +{"_id":2,"title":"Android in Action, Second Edition","isbn":"1935182722","pageCount":592,"publishedDate":{"$date":"2011-01-14T08:00:00Z"},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ableson2.jpg","shortDescription":"Android in Action, Second Edition is a comprehensive tutorial for Android developers. Taking you far beyond \"Hello Android,\" this fast-paced book puts you in the driver's seat as you learn important architectural concepts and implementation strategies. You'll master the SDK, build WebKit apps using HTML 5, and even learn to extend or replace Android's built-in features by building useful and intriguing examples. ","longDescription":"When it comes to mobile apps, Android can do almost anything and with this book, so can you! Android runs on mobile devices ranging from smart phones to tablets to countless special-purpose gadgets. It's the broadest mobile platform available. Android in Action, Second Edition is a comprehensive tutorial for Android developers. Taking you far beyond \"Hello Android,\" this fast-paced book puts you in the driver's seat as you learn important architectural concepts and implementation strategies. You'll master the SDK, build WebKit apps using HTML 5, and even learn to extend or replace Android's built-in features by building useful and intriguing examples. ","status":"PUBLISH","authors":["W. Frank Ableson","Robi Sen"],"categories":["Java"]} +{"_id":3,"title":"Specification by Example","isbn":"1617290084","pageCount":0,"publishedDate":{"$date":"2011-06-03T07:00:00Z"},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/adzic.jpg","status":"PUBLISH","authors":["Gojko Adzic"],"categories":["Software Engineering"]} +{"_id":4,"title":"Flex 3 in Action","isbn":"1933988746","pageCount":576,"publishedDate":{"$date":"2009-02-02T08:00:00Z"},"thumbnailUrl":"https://s3.amazonaws.com/AKIAJC5RLADLUMVRPFDQ.book-thumb-images/ahmed.jpg","longDescription":"New web applications require engaging user-friendly interfaces and the cooler, the better. With Flex 3, web developers at any skill level can create high-quality, effective, and interactive Rich Internet Applications (RIAs) quickly and easily. Flex removes the complexity barrier from RIA development by offering sophisticated tools and a straightforward programming language so you can focus on what you want to do instead of how to do it. And now that the major components of Flex are free and open-source, the cost barrier is gone, as well! Flex 3 in Action is an easy-to-follow, hands-on Flex tutorial. Chock-full of examples, this book goes beyond feature coverage and helps you put Flex to work in real day-to-day tasks. You'll quickly master the Flex API and learn to apply the techniques that make your Flex applications stand out from the crowd. Interesting themes, styles, and skins It's in there. Working with databases You got it. Interactive forms and validation You bet. Charting techniques to help you visualize data Bam! The expert authors of Flex 3 in Action have one goal to help you get down to business with Flex 3. Fast. Many Flex books are overwhelming to new users focusing on the complexities of the language and the super-specialized subjects in the Flex eco-system; Flex 3 in Action filters out the noise and dives into the core topics you need every day. Using numerous easy-to-understand examples, Flex 3 in Action gives you a strong foundation that you can build on as the complexity of your projects increases.","status":"PUBLISH","authors":["Tariq Ahmed with Jon Hirschi","Faisal Abid"],"categories":["Internet"]} diff --git a/tests/io/mongodb/books.bson.gz b/tests/io/mongodb/books.bson.gz new file mode 100644 index 0000000000000000000000000000000000000000..d8fc1bd4daab66770180026b99a08a80dfafc197 GIT binary patch literal 2452 zcmV;F32XKriwFpkecxsP17dG)YjZAQb8l_{?O0uJ97z>zo4^vmLqJ*yBt9yU*sw4& zN$kxhJnSS+VkdU++71>^P}5!0U77BxPF1zXlP4s^1Al=h;J5Hs_#d2mt9yDT4hEz} z;$i(VJ>}|K_v74iPtD)Ho+Q_PtFvS?Ns`T=L#dJ*cpRI|S)B=O#5fJwnt?c0sWq8+ znrZr;Y-&F-$+xig&cp4ycOGor-`Yy9)iPI4Y-2)l2#Z(iW>RWjsBA|Dl_Y=v(;tQ# zbWYJ!6C-u`+Lg%xOcWv1{?V;letRS<`OX?S_oLKSx5h8`#xI`SJ=!1d?7u$v_0i$8 zoi9ccYiC2YG1L_f^ta@sRNk7c(VIF?Hhp1T*ik-px@KShy73=GB(xV&;LzHsLK&~m zln70*PRmkEtrMMvOr5FH*2?*jIFU1La znW{`s1fp(&=QrU}D3r(?j&wJ@@Ja?DvyjCAbX7v0p&`iT2#$-d;QId#R}aS< zpL`Sab@=-6{@(F!az!=)+~boWvFYvPJNP&qiD$skOpGH(C7XAWt5`om`B74nBv(=j z9EDS!-@uDt<>Unbfsa34Z5eu_XXx+Z@wF*!%GGQ`T4&_#hYudyyT854$bZ?mz{tP< z^xf~o1xDWg@)-GF5b}>UJ`lhoF%~-=SW%$ojn2>?&Yt1ZGkCOD15uzTen=f)0Gfuo zgAq?a=}=EL04!Md3!tdtO17;)paVK=NgdG{-$n-AdSp_jJODz%mfceIFWJfoZuv1|{0Al+# zD&jI^K@qu513j_Xph{3y#Uv$-<_JL*Q=2xNmlyz2o#B1YC0r-qk=Tzufnrb#k;9KL z5YMg6OV!gq(gA?z++Z`BK2s$z6TF?MKm!p?d87}Y_59a#;hu1h?SsTPKDnlv>5v7ZkR59C?>%1AXS-K8C!dSx&>afA|bWVDKvOvk?dr$(4Gc#sb@;e3kxux(LZSl zDg=S{v(#D@b!`CHR~l~F{L{uT=wT%+;yF+lt#;zno$iiLZXS!hSI2uhPalc)?`JFH zIhLc<8jxNyFj2edHBWt7f0~xMhRjmtOgn2BcpuO{yxzWig4gyCi$Rn_Dryvo1ry&w zkoyfGyK|_=jYv!Bz4A+U>Cdt}JEJ()R*#h<3R_={=N_q4alZO^@P+d`rI;g^T@3b# z<<}1*-!O(y1%d@CA-nyD01Zk=FY$6Ke<7j1*auheBF%+v|0@;d*3DMc%`#QsZ>rg1 zy?eSBZP3#$HI+hRX3PK4{CK`kiVSjqH+&+hrVP4LnO?ej%*U;!7zQpERy&sqt7}+) zf@xH0N*Csq@yX`aC3@?VXz$zj;F)*KqXf2@pw#ff0+af zs;4o9t6J>do%>r4Z{L3K*{T+!zxTD+PYzCQU(jMNz6>pvz0+x{#4gG0_xfBsw{K=Z zHodo=uj_Ho#}s?l?N8~Fq(Js2$8z~mo2n~)s;FGgN_8%_`}@pAxqAPzd#fq>`~yEA ze4gC<@&6`81&|q2^uF-=0fg5pH5YR=S?>d<-Zt71`Q$Qh?+k7@rJp@|t3dpqC2__~F}zX?D=-nL&4L zQ9S;@-$xqiiP*;(5_GFk{PbvV>^~Ir+FSrj#V$VG($A(cB>RrJc%;ZZ(5y8 z)=6~tC#i+eO0%0dym+cM!E0=CIFh1O4 z+sivO^+3EOM5*?+>4beHp&4+xYxB7ZvnXVs93x^lqt4DVRs(yjwrD3^MPai}^%2T| zKaB?d4J9f9M-u(Com)KOy19mZhzPj~T>fagZ3B{65q)RqnN~INcJw0l(&&gztVCux zt(?^}?Wx(`brbiotWbGU6s&ebt517Kt)hX26kX49i#muk!#TP#vvbbxgnrbZ-;(A% zFQ6Au7S7cH@d2+ACOo5-FffNPZXK8~perIHVjk`qT5qp`%O0ltSS!KT^d&9$2H_0b zAF~iIO3+N+fL*%$M-Onj8^+tqFD}MA)nyix%*6N4{(epn4fEN_v2`X SgiJ@{c>E`N1@^SQ7XSbvsH~p= literal 0 HcmV?d00001 diff --git a/tests/io/mongodb/conftest.py b/tests/io/mongodb/conftest.py index 419b98e6..0e38c79f 100644 --- a/tests/io/mongodb/conftest.py +++ b/tests/io/mongodb/conftest.py @@ -11,6 +11,7 @@ pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed") pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed") pytest.importorskip("rich", reason="Skipping tests because rich is not installed") +pytest.importorskip("undatum", reason="Skipping tests because undatum is not installed") # Define databases to be deleted before running each test case. diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py new file mode 100644 index 00000000..fad770b9 --- /dev/null +++ b/tests/io/mongodb/test_copy.py @@ -0,0 +1,115 @@ +import pytest + +from cratedb_toolkit.io.mongodb.api import mongodb_copy +from tests.conftest import check_sqlalchemy2 + +pytestmark = pytest.mark.mongodb + + +@pytest.fixture(scope="module", autouse=True) +def check_prerequisites(): + """ + This subsystem needs SQLAlchemy 2.x. + """ + check_sqlalchemy2() + + +def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer. + """ + + # Define source and target URLs. + json_resource = "file+bson:./tests/io/mongodb/books-relaxed.ndjson" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 4 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True) + assert results[0]["data"]["authors"] == [ + "W. Frank Ableson", + "Charlie Collins", + "Robi Sen", + ] + + # Verify schema in target database. + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + assert timestamp_type == "bigint" + + +def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer. + """ + + # Define source and target URLs. + json_resource = "file+bson:./tests/io/mongodb/books-canonical.ndjson" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 4 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True) + assert results[0]["data"]["authors"] == [ + "W. Frank Ableson", + "Charlie Collins", + "Robi Sen", + ] + + # Verify schema in target database. + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + + # FIXME: Why does the "canonical format" yield worse results? + assert timestamp_type == "text" + + +def test_mongodb_copy_filesystem_bson(caplog, cratedb): + """ + Verify MongoDB BSON -> CrateDB data transfer. + """ + + # Define source and target URLs. + json_resource = "file+bson:./tests/io/mongodb/books.bson.gz" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 4 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True) + assert results[0]["data"]["authors"] == [ + "W. Frank Ableson", + "Charlie Collins", + "Robi Sen", + ] + + # Verify schema in target database. + type_result = cratedb.database.run_sql( + "SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True + ) + timestamp_type = type_result[0]["type"] + assert timestamp_type == "bigint"