diff --git a/CHANGES.md b/CHANGES.md index a18e5c5b..ba46589a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,9 @@ ## Unreleased +- DynamoDB: Add special decoding for varied lists. + Store them into a separate `OBJECT(IGNORED)` column in CrateDB. +- DynamoDB: Add pagination support for `full-load` table loader ## 2024/08/27 v0.0.20 - DMS/DynamoDB: Fix table name quoting within CDC processor handler diff --git a/cratedb_toolkit/io/dynamodb/adapter.py b/cratedb_toolkit/io/dynamodb/adapter.py index 4980da86..fbdb2959 100644 --- a/cratedb_toolkit/io/dynamodb/adapter.py +++ b/cratedb_toolkit/io/dynamodb/adapter.py @@ -1,9 +1,14 @@ +import logging +import typing as t + import boto3 from yarl import URL +logger = logging.getLogger(__name__) + class DynamoDBAdapter: - def __init__(self, dynamodb_url: URL, echo: bool = False): + def __init__(self, dynamodb_url: URL): self.session = boto3.Session( aws_access_key_id=dynamodb_url.user, aws_secret_access_key=dynamodb_url.password, @@ -15,11 +20,37 @@ def __init__(self, dynamodb_url: URL, echo: bool = False): self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url) self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url) - def scan(self, table_name: str): + def scan( + self, + table_name: str, + page_size: int = 1000, + consistent_read: bool = False, + on_error: t.Literal["log", "raise"] = "log", + ) -> t.Generator[t.Dict, None, None]: """ - Return all items from DynamoDB table. + Fetch and generate all items from a DynamoDB table, with pagination. + + https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination """ - return self.dynamodb_client.scan(TableName=table_name) + key = None + while True: + try: + scan_kwargs = {"TableName": table_name, "ConsistentRead": consistent_read, "Limit": page_size} + if key is not None: + scan_kwargs += {"ExclusiveStartKey": key} + response = self.dynamodb_client.scan(**scan_kwargs) + yield response + key = response.get("LastEvaluatedKey", None) + if key is None: + break + except Exception as ex: + if on_error == "log": + logger.exception("Error reading DynamoDB table") + elif on_error == "raise": + raise + else: + raise ValueError(f"Unknown 'on_error' value: {on_error}") from ex + break def count_records(self, table_name: str): table = self.dynamodb_resource.Table(table_name) diff --git a/cratedb_toolkit/io/dynamodb/copy.py b/cratedb_toolkit/io/dynamodb/copy.py index d6af7f51..b2e1c092 100644 --- a/cratedb_toolkit/io/dynamodb/copy.py +++ b/cratedb_toolkit/io/dynamodb/copy.py @@ -9,6 +9,7 @@ from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.data import asbool logger = logging.getLogger(__name__) @@ -23,6 +24,7 @@ def __init__( dynamodb_url: str, cratedb_url: str, progress: bool = False, + debug: bool = True, ): cratedb_address = DatabaseAddress.from_string(cratedb_url) cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() @@ -36,6 +38,10 @@ def __init__( self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table) self.progress = progress + self.debug = debug + + self.page_size: int = int(self.dynamodb_url.query.get("page-size", 2500)) + self.consistent_read: bool = asbool(self.dynamodb_url.query.get("consistent-read", False)) def start(self): """ @@ -43,6 +49,9 @@ def start(self): """ records_in = self.dynamodb_adapter.count_records(self.dynamodb_table) logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}") + logger_on_error = logger.warning + if self.debug: + logger_on_error = logger.exception with self.cratedb_adapter.engine.connect() as connection: if not self.cratedb_adapter.table_exists(self.cratedb_table): connection.execute(sa.text(self.translator.sql_ddl)) @@ -50,24 +59,26 @@ def start(self): records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") progress_bar = tqdm(total=records_in) - result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table) records_out = 0 - for operation in self.items_to_operations(result["Items"]): + for result in self.dynamodb_adapter.scan( + table_name=self.dynamodb_table, + consistent_read=self.consistent_read, + page_size=self.page_size, + ): + result_size = len(result["Items"]) + try: + operation = self.translator.to_sql(result["Items"]) + except Exception as ex: + logger_on_error(f"Transforming query failed: {ex}") + continue try: connection.execute(sa.text(operation.statement), operation.parameters) - records_out += 1 - except sa.exc.ProgrammingError as ex: - logger.warning(f"Running query failed: {ex}") - progress_bar.update() + records_out += result_size + progress_bar.update(n=result_size) + except Exception as ex: + logger_on_error(f"Executing query failed: {ex}") progress_bar.close() connection.commit() logger.info(f"Number of records written: {records_out}") - if records_out < records_in: + if records_out == 0: logger.warning("No data has been copied") - - def items_to_operations(self, items): - """ - Convert data for record items to INSERT statements. - """ - for item in items: - yield self.translator.to_sql(item) diff --git a/cratedb_toolkit/testing/testcontainers/localstack.py b/cratedb_toolkit/testing/testcontainers/localstack.py index 2a340c00..18b159a5 100644 --- a/cratedb_toolkit/testing/testcontainers/localstack.py +++ b/cratedb_toolkit/testing/testcontainers/localstack.py @@ -30,7 +30,7 @@ class LocalStackContainerWithKeepalive(KeepaliveContainer, LocalStackContainer): useful when used within a test matrix. Its default value is `latest`. """ - LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "latest") + LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "3.7") def __init__( self, diff --git a/doc/io/dynamodb/loader.md b/doc/io/dynamodb/loader.md index 8d561e0e..978a08c0 100644 --- a/doc/io/dynamodb/loader.md +++ b/doc/io/dynamodb/loader.md @@ -25,6 +25,20 @@ ctk shell --command "SELECT * FROM testdrive.demo;" ctk show table "testdrive.demo" ``` +## Options + +### `page-size` +The source URL accepts the `page-size` option to configure DynamoDB +[pagination]. The default value is `1000`. +```shell +ctk load table .../ProductCatalog?region=us-east-1&page-size=5000 +``` + +### `consistent-read` +The source URL accepts the `consistent-read` option to configure DynamoDB +[read consistency]. The default value is `false`. + + ## Variants ### CrateDB Cloud @@ -66,3 +80,5 @@ docker run \ [Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/ [Get started with DynamoDB on LocalStack]: https://docs.localstack.cloud/user-guide/aws/dynamodb/ +[pagination]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination +[read consistency]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ReadConsistency diff --git a/pyproject.toml b/pyproject.toml index 2c9ae342..347f1e5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,7 +139,7 @@ docs = [ ] dynamodb = [ "boto3", - "commons-codec>=0.0.12", + "commons-codec @ git+https://github.com/crate/commons-codec@dynamodb-full-load-batch", ] full = [ "cratedb-toolkit[cfr,cloud,datasets,io,service]", @@ -155,7 +155,7 @@ io = [ "sqlalchemy>=2", ] kinesis = [ - "commons-codec>=0.0.12", + "commons-codec @ git+https://github.com/crate/commons-codec@dynamodb-full-load-batch", "lorrystream[carabas]", ] mongodb = [ diff --git a/tests/io/dynamodb/test_adapter.py b/tests/io/dynamodb/test_adapter.py new file mode 100644 index 00000000..f8c8e7ca --- /dev/null +++ b/tests/io/dynamodb/test_adapter.py @@ -0,0 +1,42 @@ +import pytest +from botocore.exceptions import ParamValidationError +from yarl import URL + +from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter + +pytestmark = pytest.mark.dynamodb + + +RECORD = { + "Id": {"N": "101"}, +} + + +def test_adapter_scan_success(dynamodb): + dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + adapter = DynamoDBAdapter(URL(dynamodb_url)) + adapter.scan("foo") + + +def test_adapter_scan_failure_consistent_read(dynamodb): + """ + Check supplying invalid parameters to `DynamoDBAdapter` fails as expected. + """ + dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + adapter = DynamoDBAdapter(URL(dynamodb_url)) + + with pytest.raises(ParamValidationError) as ex: + next(adapter.scan("demo", consistent_read=-42, on_error="raise")) + assert ex.match("Parameter validation failed:\nInvalid type for parameter ConsistentRead, value: -42.*") + + +def test_adapter_scan_failure_page_size(dynamodb): + """ + Check supplying invalid parameters to `DynamoDBAdapter` fails as expected. + """ + dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + adapter = DynamoDBAdapter(URL(dynamodb_url)) + + with pytest.raises(ParamValidationError) as ex: + next(adapter.scan("demo", page_size=-1, on_error="raise")) + assert ex.match("Parameter validation failed:\nInvalid value for parameter Limit, value: -1, valid min value: 1") diff --git a/tests/io/dynamodb/test_cli.py b/tests/io/dynamodb/test_cli.py index 80d87d63..71d33e11 100644 --- a/tests/io/dynamodb/test_cli.py +++ b/tests/io/dynamodb/test_cli.py @@ -10,8 +10,8 @@ def test_dynamodb_load_table(caplog, cratedb, dynamodb, dynamodb_test_manager): """ CLI test: Invoke `ctk load table` for DynamoDB. """ - cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" dynamodb_url = f"{dynamodb.get_connection_url()}/ProductCatalog?region=us-east-1" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with sample dataset. dynamodb_test_manager.load_product_catalog() diff --git a/tests/io/dynamodb/test_copy.py b/tests/io/dynamodb/test_copy.py index 8d1d605e..2ced15b1 100644 --- a/tests/io/dynamodb/test_copy.py +++ b/tests/io/dynamodb/test_copy.py @@ -5,42 +5,22 @@ pytestmark = pytest.mark.dynamodb -RECORD_UTM = { +RECORD = { "Id": {"N": "101"}, - "utmTags": { - "L": [ - { - "M": { - "date": {"S": "2024-08-28T20:05:42.603Z"}, - "utm_adgroup": {"L": [{"S": ""}, {"S": ""}]}, - "utm_campaign": {"S": "34374686341"}, - "utm_medium": {"S": "foobar"}, - "utm_source": {"S": "google"}, - } - } - ] - }, - "location": { - "M": { - "coordinates": {"L": [{"S": ""}]}, - "meetingPoint": {"S": "At the end of the tunnel"}, - "address": {"S": "Salzbergwerk Berchtesgaden"}, - }, - }, } -def test_dynamodb_copy(caplog, cratedb, dynamodb, dynamodb_test_manager): +def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager): """ - CLI test: Invoke `ctk load table` for DynamoDB. + Verify `DynamoDBFullLoad` works as expected. """ # Define source and target URLs. - cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with data. - dynamodb_test_manager.load_records(table_name="demo", records=[RECORD_UTM]) + dynamodb_test_manager.load_records(table_name="demo", records=[RECORD]) # Run transfer command. table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url) @@ -52,20 +32,4 @@ def test_dynamodb_copy(caplog, cratedb, dynamodb, dynamodb_test_manager): assert cratedb.database.count_records("testdrive.demo") == 1 results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608 - assert results[0]["data"] == { - "Id": 101.0, - "utmTags": [ - { - "date": "2024-08-28T20:05:42.603Z", - "utm_adgroup": ["", ""], - "utm_campaign": "34374686341", - "utm_medium": "foobar", - "utm_source": "google", - } - ], - "location": { - "coordinates": [""], - "meetingPoint": "At the end of the tunnel", - "address": "Salzbergwerk Berchtesgaden", - }, - } + assert results[0]["data"] == {"Id": 101.0}