Skip to content

Commit

Permalink
SNOW-1859651 Topic2TableMode for patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Dec 18, 2024
1 parent bf4ff0a commit 3ec125f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import com.snowflake.kafka.connector.Utils;
import java.util.Map;
import java.util.regex.Pattern;

public class TopicToTableModeExtractor {

private static Pattern topicRegexPattern =
Pattern.compile("\\[([0-9]-[0-9]|[a-z]-[a-z]|[A-Z]-[A-Z]|[!-@])\\][*+]?");

/** Defines whether single target table is fed by one or many source topics. */
public enum Topic2TableMode {
// Single topic = single table
Expand All @@ -26,6 +30,15 @@ private TopicToTableModeExtractor() {}
*/
public static Topic2TableMode determineTopic2TableMode(
Map<String, String> topic2TableMap, String topic) {

boolean anyTopicInMapIsRegex =
topic2TableMap.keySet().stream()
.anyMatch(topic2TableMapKey -> topicRegexPattern.matcher(topic2TableMapKey).find());

if (anyTopicInMapIsRegex) {
return Topic2TableMode.MANY_TOPICS_SINGLE_TABLE;
}

String tableName = Utils.tableName(topic, topic2TableMap);
return topic2TableMap.values().stream()
.filter(table -> table.equalsIgnoreCase(tableName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;
import static com.snowflake.kafka.connector.Utils.HTTP_NON_PROXY_HOSTS;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.Topic2TableMode.SINGLE_TOPIC_SINGLE_TABLE;
import static com.snowflake.kafka.connector.internal.TestUtils.getConfig;
import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -221,18 +223,30 @@ public void testNameMapCovered() {
connectorConfigValidator.validateConfig(config);
}

@Test
public void testTopic2TableCorrectlyDeterminesMode() {
Map<String, String> config = getConfig();
config.put(TOPICS_TABLES_MAP, "src1:target1,src2:target2,src3:target1");
connectorConfigValidator.validateConfig(config);
Map<String, String> topic2Table = Utils.parseTopicToTableMap(config.get(TOPICS_TABLES_MAP));
assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src1"))
.isEqualTo(TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE);
assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src2"))
.isEqualTo(TopicToTableModeExtractor.Topic2TableMode.SINGLE_TOPIC_SINGLE_TABLE);
assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src3"))
.isEqualTo(TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE);
@ParameterizedTest
@MethodSource("topicToTableTestData")
public void testTopic2TableCorrectlyDeterminesMode(
String topicToTable, String topic, TopicToTableModeExtractor.Topic2TableMode expected) {
// given
Map<String, String> topic2Table = Utils.parseTopicToTableMap(topicToTable);

// when
TopicToTableModeExtractor.Topic2TableMode actual =
TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, topic);

// then
assertThat(actual).isEqualTo(expected);
}

public static Stream<Arguments> topicToTableTestData() {
return Stream.of(
Arguments.of("src1:target1,src2:target2,src3:target1", "src1", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("src1:target1,src2:target2,src3:target1", "src2", SINGLE_TOPIC_SINGLE_TABLE),
Arguments.of("topic[0-9]:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("to[0-9]pic:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("[0-9]topic:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("topic[a-z]:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("topic[0-9]:tableA", "randomTopicName", MANY_TOPICS_SINGLE_TABLE));
}

@Test
Expand Down

0 comments on commit 3ec125f

Please sign in to comment.