Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB: More ad hoc fixes for supporting real-world data #255

Merged
merged 12 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@


## Unreleased
- MongoDB: Rename columns with leading underscores to use double leading underscores
- MongoDB: Add support for UUID types
- MongoDB: Improve reading timestamps in previous BSON formats
- MongoDB: Fix processing empty arrays/lists. By default, assume `TEXT` as inner type.
- MongoDB: For `ctk load table`, use "partial" scan for inferring the collection schema,
based on the first 10,000 documents.
- MongoDB: Skip leaking `UNKNOWN` fields into SQL DDL.
This means relevant column definitions will not be included into the SQL DDL.
- MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy.
- MongoDB: Sanitize lists of varying objects
- MongoDB: Add `--treatment` option for applying special treatments to certain items
on real-world data
- MongoDB: Use pagination on source collection, for creating batches towards CrateDB
- MongoDB: Unlock importing MongoDB Extended JSON files using `file+bson://...`

## 2024/09/02 v0.0.21
- DynamoDB: Add special decoding for varied lists.
Expand Down
73 changes: 48 additions & 25 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import abstractmethod
from pathlib import Path

from yarl import URL
from boltons.urlutils import URL

from cratedb_toolkit.api.guide import GuidingTexts
from cratedb_toolkit.cluster.util import get_cluster_info
Expand Down Expand Up @@ -37,7 +37,10 @@
logger.info(f"Connecting to CrateDB Cloud Cluster: {self.cloud_id}")

def load_table(
self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, transformation: Path = None
self,
resource: InputOutputResource,
target: t.Optional[TableAddress] = None,
transformation: Path = None,
):
"""
Load data into a database table on CrateDB Cloud.
Expand Down Expand Up @@ -110,46 +113,66 @@
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""
source_url = resource.url
source_url_obj = URL(resource.url)
target_url = self.address.dburi
source_url_obj = URL(source_url)
if source_url.startswith("dynamodb"):

if source_url_obj.scheme.startswith("dynamodb"):
from cratedb_toolkit.io.dynamodb.api import dynamodb_copy

if not dynamodb_copy(source_url, target_url, progress=True):
if not dynamodb_copy(str(source_url_obj), target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)

elif source_url.startswith("influxdb"):
elif source_url_obj.scheme.startswith("file"):
if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme:
mongodb_copy_generic(

Check warning on line 129 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L128-L129

Added lines #L128 - L129 were not covered by tests
str(source_url_obj),
target_url,
transformation=transformation,
progress=True,
)

elif source_url_obj.scheme.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http://"
if asbool(source_url_obj.query.get("ssl")):
http_scheme = "https://"
source_url = source_url.replace("influxdb2://", http_scheme)
if not influxdb_copy(source_url, target_url, progress=True):
http_scheme = "http"
if asbool(source_url_obj.query_params.get("ssl")):
http_scheme = "https"

Check warning on line 141 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L141

Added line #L141 was not covered by tests
source_url_obj.scheme = source_url_obj.scheme.replace("influxdb2", http_scheme)
if not influxdb_copy(str(source_url_obj), target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
elif source_url.startswith("mongodb"):
if "+cdc" in source_url:
source_url = source_url.replace("+cdc", "")

elif source_url_obj.scheme.startswith("mongodb"):
if "+cdc" in source_url_obj.scheme:
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")

Check warning on line 150 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L150

Added line #L150 was not covered by tests
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

mongodb_relay_cdc(source_url, target_url, progress=True)
mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)

Check warning on line 153 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L153

Added line #L153 was not covered by tests
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(
source_url,
mongodb_copy_generic(
str(source_url_obj),
target_url,
transformation=transformation,
limit=int(source_url_obj.query.get("limit", 0)),
progress=True,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
)
else:
raise NotImplementedError("Importing resource not implemented yet")


def mongodb_copy_generic(
source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False
):
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(
source_url,
target_url,
transformation=transformation,
progress=progress,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)

Check warning on line 178 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L176-L178

Added lines #L176 - L178 were not covered by tests
134 changes: 134 additions & 0 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import itertools
import logging
import typing as t
from abc import abstractmethod
from functools import cached_property
from pathlib import Path

import boltons.urlutils
import polars as pl
import pymongo
import yarl
from attrs import define, field
from boltons.urlutils import URL
from bson.raw_bson import RawBSONDocument
from undatum.common.iterable import IterableData

from cratedb_toolkit.io.mongodb.util import batches
from cratedb_toolkit.model import DatabaseAddress

logger = logging.getLogger(__name__)


@define
class MongoDBAdapterBase:
address: DatabaseAddress
effective_url: URL
database_name: str
collection_name: str

_custom_query_parameters = ["batch-size", "limit", "offset"]

@classmethod
def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]):
if not isinstance(url, str):
url = str(url)
mongodb_address = DatabaseAddress.from_string(url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
logger.info(f"Collection address: {mongodb_collection_address}")
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table
for custom_query_parameter in cls._custom_query_parameters:
mongodb_uri.query_params.pop(custom_query_parameter, None)
return cls(
address=mongodb_address,
effective_url=mongodb_uri,
database_name=mongodb_database,
collection_name=mongodb_collection,
)

def __attrs_post_init__(self):
self.setup()

@cached_property
def batch_size(self) -> int:
return int(self.address.uri.query_params.get("batch-size", 500))

@cached_property
def limit(self) -> int:
return int(self.address.uri.query_params.get("limit", 0))

@cached_property
def offset(self) -> int:
return int(self.address.uri.query_params.get("offset", 0))

@abstractmethod
def setup(self):
raise NotImplementedError()

Check warning on line 67 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L67

Added line #L67 was not covered by tests

@abstractmethod
def record_count(self, filter_=None) -> int:
raise NotImplementedError()

Check warning on line 71 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L71

Added line #L71 was not covered by tests

@abstractmethod
def query(self):
raise NotImplementedError()

Check warning on line 75 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L75

Added line #L75 was not covered by tests


@define
class MongoDBFileAdapter(MongoDBAdapterBase):
_path: Path = field(init=False)

def setup(self):
self._path = Path(self.address.uri.path)

def record_count(self, filter_=None) -> int:
"""
https://stackoverflow.com/a/27517681
"""
f = open(self._path, "rb")
bufgen = itertools.takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in itertools.repeat(None)))
return sum(buf.count(b"\n") for buf in bufgen if buf)

def query(self):
if not self._path.exists():
raise FileNotFoundError(f"Resource not found: {self._path}")

Check warning on line 95 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L95

Added line #L95 was not covered by tests
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")

Check warning on line 97 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L97

Added line #L97 was not covered by tests
if self._path.suffix in [".ndjson", ".jsonl"]:
data = pl.read_ndjson(self._path, batch_size=self.batch_size, n_rows=self.limit or None).to_dicts()
elif ".bson" in str(self._path):
data = IterableData(str(self._path), options={"format_in": "bson"}).iter()
else:
raise ValueError(f"Unsupported file type: {self._path.suffix}")

Check warning on line 103 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L103

Added line #L103 was not covered by tests
return batches(data, self.batch_size)


@define
class MongoDBServerAdapter(MongoDBAdapterBase):
_mongodb_client: pymongo.MongoClient = field(init=False)
_mongodb_collection: pymongo.collection.Collection = field(init=False)

def setup(self):
self._mongodb_client: pymongo.MongoClient = pymongo.MongoClient(
str(self.effective_url),
document_class=RawBSONDocument,
datetime_conversion="DATETIME_AUTO",
)
self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name]

def record_count(self, filter_=None) -> int:
filter_ = filter_ or {}
return self._mongodb_collection.count_documents(filter=filter_)

def query(self):
data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit)
return batches(data, self.batch_size)


def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase:
if mongodb_uri.scheme.startswith("file"):
return MongoDBFileAdapter.from_url(mongodb_uri)
elif mongodb_uri.scheme.startswith("mongodb"):
return MongoDBServerAdapter.from_url(mongodb_uri)
raise ValueError("Unable to create MongoDB adapter")

Check warning on line 134 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L134

Added line #L134 was not covered by tests
43 changes: 37 additions & 6 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import argparse
import logging
import typing as t
from pathlib import Path

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
from cratedb_toolkit.util.database import DatabaseAdapter

logger = logging.getLogger(__name__)


def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limit: int = 0, progress: bool = False):
"""
Transfer MongoDB collection using migr8.

Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
Expand All @@ -37,12 +42,11 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int
# 1. Extract schema from MongoDB collection.
logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}")
extract_args = argparse.Namespace(
url=str(mongodb_uri),
url=str(mongodb_uri) + f"&limit={limit}",
database=mongodb_database,
collection=mongodb_collection,
scan="full",
scan="partial",
transformation=transformation,
limit=limit,
)
mongodb_schema = extract(extract_args)
count = mongodb_schema[mongodb_collection]["count"]
Expand Down Expand Up @@ -71,18 +75,45 @@ def mongodb_copy(source_url, target_url, transformation: Path = None, limit: int
f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}"
)
export_args = argparse.Namespace(
url=str(mongodb_uri),
url=str(mongodb_uri) + f"&limit={limit}",
database=mongodb_database,
collection=mongodb_collection,
transformation=transformation,
limit=limit,
)
buffer = export(export_args)
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

return True


def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = None, progress: bool = False):
"""
Transfer MongoDB collection using translator component.

Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""

logger.info(f"Invoking MongoDBFullLoad. source_url={source_url}")

# Optionally configure transformations.
tm = None
if transformation:
tm = TransformationManager(path=transformation)

# Invoke `full-load` procedure.
mdb_full = MongoDBFullLoad(
mongodb_url=source_url,
cratedb_url=target_url,
tm=tm,
progress=progress,
)
mdb_full.start()
return True


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
"""
Synopsis
Expand Down
6 changes: 1 addition & 5 deletions cratedb_toolkit/io/mongodb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
def extract_parser(subargs):
parser = subargs.add_parser("extract", help="Extract a schema from a MongoDB database")
parser.add_argument("--url", default="mongodb://localhost:27017", help="MongoDB URL")
parser.add_argument("--host", default="localhost", help="MongoDB host")
parser.add_argument("--port", default=27017, help="MongoDB port")
parser.add_argument("--database", required=True, help="MongoDB database")
parser.add_argument("--collection", help="MongoDB collection to create a schema for")
parser.add_argument(
Expand All @@ -42,10 +40,8 @@ def translate_parser(subargs):
def export_parser(subargs):
parser = subargs.add_parser("export", help="Export a MongoDB collection as plain JSON")
parser.add_argument("--url", default="mongodb://localhost:27017", help="MongoDB URL")
parser.add_argument("--collection", required=True)
parser.add_argument("--host", default="localhost", help="MongoDB host")
parser.add_argument("--port", default=27017, help="MongoDB port")
parser.add_argument("--database", required=True, help="MongoDB database")
parser.add_argument("--collection", required=True, help="MongoDB collection to export")
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")
parser.add_argument("--transformation", type=Path, required=False, help="Zyp transformation file")

Expand Down
Loading