Skip to content

Commit

Permalink
[Source-postgres] : Populate airyte_meta.changes for xmin path (#36982)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Apr 10, 2024
1 parent 0296c43 commit fda0829
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.25
dockerImageTag: 3.3.26
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils;
Expand All @@ -25,6 +26,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -99,7 +101,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
.filter(CatalogHelpers.getTopLevelFieldNames(airbyteStream)::contains)
.collect(Collectors.toList());

final AutoCloseableIterator<JsonNode> queryStream = queryTableXmin(selectedDatabaseFields, table.getNameSpace(), table.getName());
final AutoCloseableIterator<AirbyteRecordData> queryStream = queryTableXmin(selectedDatabaseFields, table.getNameSpace(), table.getName());
final AutoCloseableIterator<AirbyteMessage> recordIterator =
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream, pair);
Expand All @@ -111,18 +113,18 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
return iteratorList;
}

private AutoCloseableIterator<JsonNode> queryTableXmin(
final List<String> columnNames,
final String schemaName,
final String tableName) {
private AutoCloseableIterator<AirbyteRecordData> queryTableXmin(
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);
final AirbyteStreamNameNamespacePair airbyteStream =
AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<JsonNode> stream = database.unsafeQuery(
final Stream<AirbyteRecordData> stream = database.unsafeQuery(
connection -> createXminQueryStatement(connection, columnNames, schemaName, tableName, airbyteStream),
sourceOperations::rowToJson);
sourceOperations::convertDatabaseRowToAirbyteRecordData);
return AutoCloseableIterators.fromStream(stream, airbyteStream);
} catch (final SQLException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -206,17 +208,22 @@ public static boolean shouldPerformFullSync(final XminStatus currentXminStatus,

// Transforms the given iterator to create an {@link AirbyteRecordMessage}
private static AutoCloseableIterator<AirbyteMessage> getRecordIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final AutoCloseableIterator<AirbyteRecordData> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage()
return AutoCloseableIterators.transform(recordIterator, airbyteRecordData -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)));
.withData(airbyteRecordData.rawRowData())
.withMeta(isMetaChangesEmptyOrNull(airbyteRecordData.meta()) ? null : airbyteRecordData.meta())));
}

private static boolean isMetaChangesEmptyOrNull(AirbyteRecordMessageMeta meta) {
return meta == null || meta.getChanges() == null || meta.getChanges().isEmpty();
}

// Augments the given iterator with record count logs.
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.26 | 2024-04-10 | [36982](https://github.com/airbytehq/airbyte/pull/36982) | Populate airyte_meta.changes for xmin path |
| 3.3.25 | 2024-04-10 | [36981](https://github.com/airbytehq/airbyte/pull/36981) | Track latest CDK |
| 3.3.24 | 2024-04-10 | [36865](https://github.com/airbytehq/airbyte/pull/36865) | Track latest CDK |
| 3.3.23 | 2024-04-02 | [36759](https://github.com/airbytehq/airbyte/pull/36759) | Track latest CDK |
Expand Down

0 comments on commit fda0829

Please sign in to comment.