diff --git a/CHANGELOG.md b/CHANGELOG.md index ff7f2611..2a8bb663 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## Release History +### 1.15.0 (2024-04-18) +#### Key Bug Fixes +* Fixed an issue where using `CosmosDBSinkConnector` in bulk mode failed to write items for container with nested partition key path - [PR 565](https://github.com/microsoft/kafka-connect-cosmosdb/pull/565) ### 1.14.2 (2024-03-12) #### Key Bug Fixes diff --git a/pom.xml b/pom.xml index bd35896f..b921ee85 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.azure.cosmos.kafka kafka-connect-cosmos - 1.14.2 + 1.15.0 kafka-connect-cosmos https://github.com/microsoft/kafka-connect-cosmosdb @@ -48,7 +48,7 @@ com.azure azure-cosmos - 4.56.0 + 4.58.0 com.jayway.jsonpath diff --git a/src/main/java/com/azure/cosmos/kafka/connect/sink/BulkWriter.java b/src/main/java/com/azure/cosmos/kafka/connect/sink/BulkWriter.java index 5088e703..6fb96be7 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/sink/BulkWriter.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/sink/BulkWriter.java @@ -22,10 +22,10 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.LinkedHashMap; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -120,16 +120,8 @@ protected SinkWriteResponse writeCore(List sinkRecords) { private PartitionKey getPartitionKeyValue(Object recordValue) { checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format."); - //TODO: examine the code here for sub-partition - String partitionKeyPath = StringUtils.join(this.partitionKeyDefinition.getPaths(), ""); Map recordMap = (Map) recordValue; - Object partitionKeyValue = recordMap.get(partitionKeyPath.substring(1)); - PartitionKeyInternal partitionKeyInternal = PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), false); - - return ImplementationBridgeHelpers - .PartitionKeyHelper - .getPartitionKeyAccessor() - .toPartitionKey(partitionKeyInternal); + return PartitionKey.fromItem(recordMap, this.partitionKeyDefinition); } BulkOperationFailedException handleErrorStatusCode( diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/BulkWriterTests.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/BulkWriterTests.java index 7ed57a90..529927c4 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/BulkWriterTests.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/BulkWriterTests.java @@ -8,6 +8,7 @@ import com.azure.cosmos.implementation.BadRequestException; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.RequestTimeoutException; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.models.CosmosBulkItemResponse; import com.azure.cosmos.models.CosmosBulkOperationResponse; import com.azure.cosmos.models.CosmosBulkOperations; @@ -16,6 +17,9 @@ import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; +import com.azure.cosmos.models.PartitionKind; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; @@ -31,11 +35,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.Random; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import static junit.framework.Assert.assertNotNull; import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertFalse; import static junit.framework.TestCase.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; @@ -59,6 +65,7 @@ public void setup(){ PartitionKeyDefinition mockedPartitionKeyDefinition = Mockito.mock(PartitionKeyDefinition.class); Mockito.when(mockedContainerProperties.getPartitionKeyDefinition()).thenReturn(mockedPartitionKeyDefinition); Mockito.when(mockedPartitionKeyDefinition.getPaths()).thenReturn(Arrays.asList("/id")); + Mockito.when(mockedPartitionKeyDefinition.getKind()).thenReturn(PartitionKind.HASH); bulkWriter = new BulkWriter(container, MAX_RETRY_COUNT, COMPRESSION_ENABLED); } @@ -207,6 +214,65 @@ public void testBulkWriteFailedWithTransientException() { assertEquals(HttpConstants.StatusCodes.REQUEST_TIMEOUT, ((CosmosException)response.getFailedRecordResponses().get(0).getException()).getStatusCode()); } + @Test + public void testBulkWriteForContainerWithNestedPartitionKey() { + CosmosContainer containerWithNestedPartitionKey = Mockito.mock(CosmosContainer.class); + + CosmosContainerResponse mockedContainerResponse = Mockito.mock(CosmosContainerResponse.class); + Mockito.when(containerWithNestedPartitionKey.read()).thenReturn(mockedContainerResponse); + CosmosContainerProperties mockedContainerProperties = Mockito.mock(CosmosContainerProperties.class); + Mockito.when(mockedContainerResponse.getProperties()).thenReturn(mockedContainerProperties); + PartitionKeyDefinition mockedPartitionKeyDefinition = Mockito.mock(PartitionKeyDefinition.class); + Mockito.when(mockedContainerProperties.getPartitionKeyDefinition()).thenReturn(mockedPartitionKeyDefinition); + Mockito.when(mockedPartitionKeyDefinition.getPaths()).thenReturn(Arrays.asList("/location/city/zipCode")); + Mockito.when(mockedPartitionKeyDefinition.getKind()).thenReturn(PartitionKind.HASH); + + BulkWriter testWriter = new BulkWriter(containerWithNestedPartitionKey, MAX_RETRY_COUNT, COMPRESSION_ENABLED); + + String itemId = UUID.randomUUID().toString(); + String pkValue = "1234"; + + ObjectNode objectNode = Utils.getSimpleObjectMapper().createObjectNode(); + objectNode.put("id", itemId); + + ObjectNode locationNode = Utils.getSimpleObjectMapper().createObjectNode(); + ObjectNode cityNode = Utils.getSimpleObjectMapper().createObjectNode(); + cityNode.put("zipCode", pkValue); + locationNode.put("city", cityNode); + objectNode.put("location", locationNode); + + SinkRecord sinkRecord = + new SinkRecord( + TOPIC_NAME, + 1, + new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.STRING), + objectNode.get("id"), + new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.MAP), + Utils.getSimpleObjectMapper().convertValue(objectNode, new TypeReference>() {}), + 0L); + + // setup successful item response + List> mockedBulkOperationResponseList = new ArrayList<>(); + mockedBulkOperationResponseList.add(mockSuccessfulBulkOperationResponse(sinkRecord, itemId)); + + ArgumentCaptor> parameters = ArgumentCaptor.forClass(Iterable.class); + Mockito + .when(containerWithNestedPartitionKey.executeBulkOperations(parameters.capture())) + .thenReturn(() -> mockedBulkOperationResponseList.iterator()); + + testWriter.write(Arrays.asList(sinkRecord)); + + Iterator bulkExecutionParameters = parameters.getValue().iterator(); + + assertTrue(bulkExecutionParameters.hasNext()); + CosmosItemOperation bulkItemOperation = bulkExecutionParameters.next(); + assertNotNull(bulkItemOperation.getPartitionKeyValue()); + assertEquals(bulkItemOperation.getPartitionKeyValue(), new PartitionKey(pkValue)); + + // there should only be 1 operation + assertFalse(bulkExecutionParameters.hasNext()); + } + private SinkRecord createSinkRecord(String id) { Schema stringSchema = new ConnectSchema(Schema.Type.STRING); Schema mapSchema = new ConnectSchema(Schema.Type.MAP); diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/integration/SinkConnectorIT.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/integration/SinkConnectorIT.java index 4e8b96a0..afd3a6e4 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/integration/SinkConnectorIT.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/integration/SinkConnectorIT.java @@ -7,19 +7,24 @@ import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosContainer; import com.azure.cosmos.CosmosDatabase; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.kafka.connect.ConnectorTestConfigurations; +import com.azure.cosmos.kafka.connect.sink.BulkWriter; import com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInKeyStrategy; import com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy; import com.azure.cosmos.kafka.connect.sink.id.strategy.TemplateStrategy; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.util.CosmosPagedIterable; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -29,7 +34,9 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.connect.sink.SinkRecord; import org.sourcelab.kafka.connect.apiclient.Configuration; import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient; import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition; @@ -48,10 +55,14 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; +import java.util.Arrays; +import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import static junit.framework.Assert.fail; import static org.apache.kafka.common.utils.Utils.sleep; import static org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition.Builder; @@ -86,7 +97,7 @@ public class SinkConnectorIT { * Create an embedded Kafka Connect cluster. */ @Before - public void before() throws URISyntaxException, IOException { + public void before() throws IOException { // Load the sink.config.json config file URL configFileUrl = SinkConnectorIT.class.getClassLoader().getResource("sink.config.json"); @@ -590,6 +601,60 @@ public void testPostJsonMessageWithTTL() throws InterruptedException, ExecutionE Assert.assertFalse("Record still in DB", retrievedPerson.isPresent()); } + @Test + public void testBulkWriteForContainerWithNestedPartitionKey() { + // verify bulk writer can create records successfully for container with nested partition key path + CosmosDatabase database = null; + try { + // create a container with nested partition key + database = cosmosClient.getDatabase(UUID.randomUUID().toString()); + cosmosClient.createDatabaseIfNotExists(database.getId()); + + String containerWithNestedPartitionKey = UUID.randomUUID().toString(); + cosmosClient + .getDatabase(database.getId()) + .createContainerIfNotExists(containerWithNestedPartitionKey, "/location/city/zipCode"); + CosmosContainer testContainer = cosmosClient.getDatabase(database.getId()).getContainer(containerWithNestedPartitionKey); + + String itemId = UUID.randomUUID().toString(); + String pkValue = "1234"; + + ObjectNode objectNode = Utils.getSimpleObjectMapper().createObjectNode(); + objectNode.put("id", itemId); + ObjectNode locationNode = Utils.getSimpleObjectMapper().createObjectNode(); + ObjectNode cityNode = Utils.getSimpleObjectMapper().createObjectNode(); + cityNode.put("zipCode", pkValue); + locationNode.put("city", cityNode); + objectNode.put("location", locationNode); + + SinkRecord sinkRecord = + new SinkRecord( + kafkaTopicJson, + 1, + new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.STRING), + objectNode.get("id"), + new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.MAP), + Utils.getSimpleObjectMapper().convertValue(objectNode, new TypeReference>() {}), + 0L); + + BulkWriter testWriter = new BulkWriter(testContainer, 1, false); + testWriter.write(Arrays.asList(sinkRecord)); + + // verify the item is created successfully + try { + testContainer.readItem(itemId, new PartitionKey(pkValue), ObjectNode.class).getItem(); + } catch (Exception e) { + fail("Should be able to read item " + e.getMessage()); + } + } finally { + if (cosmosClient != null) { + if (database != null) { + database.delete(); + } + } + } + } + /** * A simple entity to serialize to/deserialize from JSON in tests. */