From c9a573213f6f8f2fd687839f05df6415558d7728 Mon Sep 17 00:00:00 2001 From: Sudesh Date: Sat, 21 Dec 2024 15:03:41 +0530 Subject: [PATCH] Reproduce IndexOutOfBoundsException --- .../streaming/TopicPartitionChannelIT.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 85b2d5442..fae6937ff 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -764,7 +764,7 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl // create tpChannel SnowflakeSinkService service = SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) - .setRecordNumber(1) + .setRecordNumber(4) .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) .addTask(testTableName, topicPartition) @@ -789,19 +789,21 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl i)); } - service.insert(blankRecords); - TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5); +// service.insert(blankRecords); +// TestUtils.assertWithRetry( +// () -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5); // Insert another two records with offset gap that requires evolution: 3, 4 - List gapRecords = TestUtils.createNativeJsonSinkRecords(2, 3, topic, PARTITION); + List gapRecords = TestUtils.createNativeJsonSinkRecords(300, 3, topic, PARTITION); gapRecords.remove(0); - service.insert(gapRecords); + + blankRecords.addAll(gapRecords); + service.insert(blankRecords); // With schematization, we need to resend a new batch should succeed even if there is an offset // gap from the previous committed offset if (withSchematization) { - service.insert(gapRecords); + service.insert(blankRecords); } TestUtils.assertWithRetry(