Skip to content

Commit

Permalink
[Source-mongo] : Add options for CAPTURE_MODE (#36851)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored May 6, 2024
1 parent d74125b commit ddc3d2b
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ java {
}

dependencies {
implementation 'io.debezium:debezium-embedded:2.4.0.Final'
implementation 'io.debezium:debezium-connector-mongodb:2.4.0.Final'
implementation 'io.debezium:debezium-embedded:2.5.1.Final'
implementation 'io.debezium:debezium-connector-mongodb:2.5.1.Final'

testImplementation 'org.testcontainers:mongodb:1.19.0'

Expand All @@ -53,8 +53,8 @@ dependencies {
dataGeneratorImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5'
dataGeneratorImplementation 'org.mongodb:mongodb-driver-sync:4.10.2'

debeziumTestImplementation 'io.debezium:debezium-embedded:2.4.0.Final'
debeziumTestImplementation 'io.debezium:debezium-connector-mongodb:2.4.0.Final'
debeziumTestImplementation 'io.debezium:debezium-embedded:2.5.1.Final'
debeziumTestImplementation 'io.debezium:debezium-connector-mongodb:2.5.1.Final'
debeziumTestImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5'
debeziumTestImplementation 'com.github.spotbugs:spotbugs-annotations:4.7.3'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@
"default": "Fail sync",
"order": 11,
"group": "advanced"
},
"update_capture_mode": {
"type": "string",
"title": "Capture mode (Advanced)",
"description": "Determines how Airbyte looks up the value of an updated document. If 'Lookup' is chosen, the current value of the document will be read. If 'Post Image' is chosen, then the version of the document immediately after an update will be read. WARNING : Severe data loss will occur if this option is chosen and the appropriate settings are not set on your Mongo instance : https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre-and-post-images.",
"enum": ["Lookup", "Post Image"],
"default": "Lookup",
"order": 12,
"group": "advanced"
}
},
"groups": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.3.11
dockerImageTag: 1.3.12
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public class MongoConstants {
public static final String FAIL_SYNC_OPTION = "Fail sync";
public static final String RESYNC_DATA_OPTION = "Re-sync data";

public static final String UPDATE_CAPTURE_MODE = "update_capture_mode";

public static final String CAPTURE_MODE_LOOKUP_OPTION = "Lookup";
public static final String CAPTURE_MODE_POST_IMAGE_OPTION = "Post Image";

private MongoConstants() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.mongodb;

import static io.airbyte.integrations.source.mongodb.MongoConstants.AUTH_SOURCE_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.CAPTURE_MODE_LOOKUP_OPTION;
import static io.airbyte.integrations.source.mongodb.MongoConstants.CHECKPOINT_INTERVAL;
import static io.airbyte.integrations.source.mongodb.MongoConstants.CHECKPOINT_INTERVAL_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIGURATION_KEY;
Expand All @@ -18,9 +19,11 @@
import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.RESYNC_DATA_OPTION;
import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.UPDATE_CAPTURE_MODE;
import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.OptionalInt;

/**
Expand All @@ -41,7 +44,12 @@ public record MongoDbSourceConfig(JsonNode rawConfig) {
}

public JsonNode getDatabaseConfig() {
return rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY);
JsonNode rawDbConfigNode = rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY);
// Add other properties to the raw db config. Unfortunately, due to the setup of the config json,
// other connection properties need to
// be added to this config.
addAdvancedPropertiesToDatabaseConfig(rawDbConfigNode);
return rawDbConfigNode;
}

public String getAuthSource() {
Expand Down Expand Up @@ -107,4 +115,16 @@ public boolean shouldFailSyncOnInvalidCursor() {
}
}

public String getUpdateCaptureMode() {
if (rawConfig.has(UPDATE_CAPTURE_MODE)) {
return rawConfig.get(UPDATE_CAPTURE_MODE).asText();
} else {
return CAPTURE_MODE_LOOKUP_OPTION;
}
}

private void addAdvancedPropertiesToDatabaseConfig(JsonNode dbConfig) {
((ObjectNode) dbConfig).put(UPDATE_CAPTURE_MODE, getUpdateCaptureMode());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.mongodb.cdc;

import static io.airbyte.integrations.source.mongodb.MongoConstants.CAPTURE_MODE_POST_IMAGE_OPTION;
import static io.airbyte.integrations.source.mongodb.MongoConstants.UPDATE_CAPTURE_MODE;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.Configuration.AUTH_SOURCE_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.Configuration.CONNECTION_STRING_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.Configuration.CREDENTIALS_PLACEHOLDER;
Expand Down Expand Up @@ -31,6 +33,9 @@ public class MongoDbDebeziumPropertiesManager extends DebeziumPropertiesManager

static final String COLLECTION_INCLUDE_LIST_KEY = "collection.include.list";
static final String DATABASE_INCLUDE_LIST_KEY = "database.include.list";

static final String MONGODB_POST_IMAGE_KEY = "capture.mode.full.update.type";
static final String MONGODB_POST_IMAGE_VALUE = "post_image";
static final String CAPTURE_TARGET_KEY = "capture.target";
static final String DOUBLE_QUOTES_PATTERN = "\"";
static final String MONGODB_AUTHSOURCE_KEY = "mongodb.authsource";
Expand Down Expand Up @@ -65,6 +70,9 @@ protected Properties getConnectionConfiguration(final JsonNode config) {
properties.setProperty(MONGODB_AUTHSOURCE_KEY, config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText());
}
properties.setProperty(MONGODB_SSL_ENABLED_KEY, MONGODB_SSL_ENABLED_VALUE);
if (config.has(UPDATE_CAPTURE_MODE) && config.get(UPDATE_CAPTURE_MODE).asText().equals(CAPTURE_MODE_POST_IMAGE_OPTION)) {
properties.setProperty(MONGODB_POST_IMAGE_KEY, MONGODB_POST_IMAGE_VALUE);
}
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@
"default": "Fail sync",
"order": 11,
"group": "advanced"
},
"update_capture_mode": {
"type": "string",
"title": "Capture mode (Advanced)",
"description": "Determines how Airbyte looks up the value of an updated document. If 'Lookup' is chosen, the current value of the document will be read. If 'Post Image' is chosen, then the version of the document immediately after an update will be read. WARNING : Severe data loss will occur if this option is chosen and the appropriate settings are not set on your Mongo instance : https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre-and-post-images.",
"enum": ["Lookup", "Post Image"],
"default": "Lookup",
"order": 12,
"group": "advanced"
}
},
"groups": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager.MONGODB_CONNECTION_MODE_VALUE;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager.MONGODB_CONNECTION_STRING_KEY;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager.MONGODB_PASSWORD_KEY;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager.MONGODB_POST_IMAGE_KEY;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager.MONGODB_POST_IMAGE_VALUE;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager.MONGODB_SSL_ENABLED_KEY;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager.MONGODB_SSL_ENABLED_VALUE;
import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumPropertiesManager.MONGODB_USER_KEY;
Expand All @@ -34,6 +36,7 @@
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mongodb.MongoConstants;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -80,6 +83,65 @@ void testDebeziumProperties() {
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
}

@Test
void testDebeziumProperties_captureMode_lookup() {
final List<ConfiguredAirbyteStream> streams = createStreams(4);
final AirbyteFileOffsetBackingStore offsetManager = mock(AirbyteFileOffsetBackingStore.class);
final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class);
JsonNode config = createConfiguration(Optional.of("username"), Optional.of("password"), Optional.of("admin"));
((ObjectNode) config).put(MongoConstants.UPDATE_CAPTURE_MODE, MongoConstants.CAPTURE_MODE_LOOKUP_OPTION);

when(catalog.getStreams()).thenReturn(streams);

final Properties cdcProperties = new Properties();
cdcProperties.put("test", "value");

final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog);

final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
assertEquals(21 + cdcProperties.size(), debeziumProperties.size());
assertEquals(MongoDbDebeziumPropertiesManager.normalizeName(DATABASE_NAME), debeziumProperties.get(NAME_KEY));
assertEquals(MongoDbDebeziumPropertiesManager.normalizeName(DATABASE_NAME), debeziumProperties.get(TOPIC_PREFIX_KEY));
assertEquals(EXPECTED_CONNECTION_STRING, debeziumProperties.get(MONGODB_CONNECTION_STRING_KEY));
assertEquals(MONGODB_CONNECTION_MODE_VALUE, debeziumProperties.get(MONGODB_CONNECTION_MODE_KEY));
assertEquals(config.get(USERNAME_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_USER_KEY));
assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY));
assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY));
assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
}

@Test
void testDebeziumProperties_captureMode_postImage() {
final List<ConfiguredAirbyteStream> streams = createStreams(4);
final AirbyteFileOffsetBackingStore offsetManager = mock(AirbyteFileOffsetBackingStore.class);
final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class);
JsonNode config = createConfiguration(Optional.of("username"), Optional.of("password"), Optional.of("admin"));
((ObjectNode) config).put(MongoConstants.UPDATE_CAPTURE_MODE, MongoConstants.CAPTURE_MODE_POST_IMAGE_OPTION);

when(catalog.getStreams()).thenReturn(streams);

final Properties cdcProperties = new Properties();
cdcProperties.put("test", "value");

final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog);

final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
assertEquals(22 + cdcProperties.size(), debeziumProperties.size());
assertEquals(MongoDbDebeziumPropertiesManager.normalizeName(DATABASE_NAME), debeziumProperties.get(NAME_KEY));
assertEquals(MongoDbDebeziumPropertiesManager.normalizeName(DATABASE_NAME), debeziumProperties.get(TOPIC_PREFIX_KEY));
assertEquals(EXPECTED_CONNECTION_STRING, debeziumProperties.get(MONGODB_CONNECTION_STRING_KEY));
assertEquals(MONGODB_CONNECTION_MODE_VALUE, debeziumProperties.get(MONGODB_CONNECTION_MODE_KEY));
assertEquals(config.get(USERNAME_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_USER_KEY));
assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY));
assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY));
assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
assertEquals(MONGODB_POST_IMAGE_VALUE, debeziumProperties.get(MONGODB_POST_IMAGE_KEY));
}

@Test
void testDebeziumPropertiesConnectionStringCredentialsPlaceholder() {
final List<ConfiguredAirbyteStream> streams = createStreams(4);
Expand Down
4 changes: 3 additions & 1 deletion docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,19 @@ When importing a large MongoDB collection for the first time, the import duratio
| Username | The username which is used to access the database. Required for MongoDB Atlas clusters. |
| Password | The password associated with this username. Required for MongoDB Atlas clusters. |
| Authentication Source | (MongoDB Atlas clusters only) Specifies the database that the supplied credentials should be validated against. Defaults to `admin`. See the [MongoDB documentation](https://www.mongodb.com/docs/manual/reference/connection-string/#mongodb-urioption-urioption.authSource) for more details. |
| Schema Enforced | Controls whether schema is discovered and enforced. See discussion in [Schema Enforcement](#Schema-Enforcement). |
| Schema Enforced | Controls whether schema is discovered and enforced. See discussion in [Schema Enforcement](#Schema-Enforcement). |
| Initial Waiting Time in Seconds (Advanced) | The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. |
| Size of the queue (Advanced) | The size of the internal queue. This may interfere with memory consumption and efficiency of the connector, please be careful. |
| Discovery Sample Size (Advanced) | The maximum number of documents to sample when attempting to discover the unique fields for a collection. Default is 10,000 with a valid range of 1,000 to 100,000. See the [MongoDB sampling method](https://www.mongodb.com/docs/compass/current/sampling/#sampling-method) for more details. |
| Update Capture Mode (Advanced) | Determines how Airbyte looks up the value of an updated document. Default is "Lookup". **IMPORTANT** : "Post image" is only supported in MongoDB version 6.0+. In addition, the collections of interest must be setup to [return pre and post images](https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre-and-post-images). Failure to do so will lead to data loss. |

For more information regarding configuration parameters, please see [MongoDb Documentation](https://docs.mongodb.com/drivers/java/sync/v4.10/fundamentals/connection/).

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.3.12 | 2024-05-07 | [36851](https://github.com/airbytehq/airbyte/pull/36851) | Upgrade debezium to version 2.5.1. |
| 1.3.11 | 2024-05-02 | [37753](https://github.com/airbytehq/airbyte/pull/37753) | Chunk size(limit) should correspond to ~1GB of data. |
| 1.3.10 | 2024-05-02 | [37781](https://github.com/airbytehq/airbyte/pull/37781) | Adopt latest CDK. |
| 1.3.9 | 2024-05-01 | [37742](https://github.com/airbytehq/airbyte/pull/37742) | Adopt latest CDK. Remove Debezium retries. |
Expand Down

0 comments on commit ddc3d2b

Please sign in to comment.