Skip to content

Commit

Permalink
refactor the way we execute query, plus format.sh
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Nov 26, 2024
1 parent 5f00f8f commit da4bbf9
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,12 @@ public interface SnowflakeConnectionService {
void appendColumnsToTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

/**
* Alter iceberg table to add columns according to a map from columnNames to their types
* Execute queries to add columns or modify Iceberg table columns
*
* @param tableName the name of the table
* @param addColumnsQuery
* @param alterSetDataTypeQuery
* @param query ADD COLUMN or ALTER SET DATA TYPE query
*/
void appendColumnsToIcebergTable(
String tableName, Optional<String> addColumnsQuery, Optional<String> alterSetDataTypeQuery);
void evolveIcebergColumns(String tableName, String query);

/**
* Alter table to drop non-nullability of a list of columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,24 +502,16 @@ public boolean hasSchemaEvolutionPermission(String tableName, String role) {
* Execute queries to add columns or modify Iceberg table columns
*
* @param tableName the name of the table
* @param addColumnsQuery ADD COLUMN query
* @param alterSetDataTypeQuery SET DATA TYPE query
* @param query ADD COLUMN or ALTER SET DATA TYPE query
*/
@Override
public void appendColumnsToIcebergTable(
String tableName, Optional<String> addColumnsQuery, Optional<String> alterSetDataTypeQuery) {
public void evolveIcebergColumns(String tableName, String query) {
LOGGER.debug("Appending columns to iceberg table");
InternalUtils.assertNotEmpty("tableName", tableName);
checkConnection();

if (addColumnsQuery.isPresent()) {
executeStatement(tableName, addColumnsQuery.get());
LOGGER.info("Query SUCCEEDED: " + addColumnsQuery);
}
if (alterSetDataTypeQuery.isPresent()) {
executeStatement(tableName, alterSetDataTypeQuery.get());
LOGGER.info("Query SUCCEEDED: " + addColumnsQuery);
}
executeStatement(tableName, query);
LOGGER.info("Query SUCCEEDED: " + query);
}

private void executeStatement(String tableName, String query) {
Expand All @@ -541,11 +533,11 @@ private void executeStatement(String tableName, String query) {
* @param columnInfosMap the mapping from the columnNames to their infos
*/
@Override
public void appendColumnsToTable(
String tableName, Map<String, ColumnInfos> columnInfosMap) {
public void appendColumnsToTable(String tableName, Map<String, ColumnInfos> columnInfosMap) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
StringBuilder appendColumnQuery = new StringBuilder("alter table identifier(?) add column if not exists ");
StringBuilder appendColumnQuery =
new StringBuilder("alter table identifier(?) add column if not exists ");

boolean first = true;
StringBuilder logColumn = new StringBuilder("[");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionTargetItems;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergSchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelCreation;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
Expand Down Expand Up @@ -537,7 +536,8 @@ private void handleInsertRowFailure(
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName());
if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
schemaEvolutionService.evolveSchemaIfNeeded(schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public SnowflakeSinkServiceV2(
Utils.isIcebergEnabled(connectorConfig), schematizationEnabled);
this.icebergTableSchemaValidator = new IcebergTableSchemaValidator(conn);
this.icebergInitService = new IcebergInitService(conn);
this.schemaEvolutionService = Utils.isIcebergEnabled(connectorConfig)
this.schemaEvolutionService =
Utils.isIcebergEnabled(connectorConfig)
? new IcebergSchemaEvolutionService(conn)
: new SnowflakeSchemaEvolutionService(conn);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution;

import java.util.Map;
import net.snowflake.ingest.streaming.internal.ColumnProperties;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.Map;

public interface SchemaEvolutionService {

/**
Expand All @@ -15,5 +14,8 @@ public interface SchemaEvolutionService {
* nullability, and columns to add
* @param record the sink record that contains the schema and actual data
*/
void evolveSchemaIfNeeded(SchemaEvolutionTargetItems targetItems, SinkRecord record, Map<String, ColumnProperties> schemaAlreadyInUse);
void evolveSchemaIfNeeded(
SchemaEvolutionTargetItems targetItems,
SinkRecord record,
Map<String, ColumnProperties> schemaAlreadyInUse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@ private IcebergFieldNode(String name, String snowflakeIcebergType) {
this.children = new LinkedHashMap<>();
}

/**
* @param sb StringBuilder
* @param parentType Snowflake Iceberg table compatible type. If a root node is a parent then
* "ROOT_NODE" is passed, because we always generate root nodes column name.
* @return field name + data type
*/
StringBuilder buildQuery(StringBuilder sb, String parentType) {
if (parentType.equals("ARRAY") || parentType.equals("MAP") || parentType.equals("ROOT_NODE")) {
sb.append(snowflakeIcebergType);
} else {
appendNameAndType(sb);
}
if (!children.isEmpty()) {
sb.append("(");
appendChildren(sb, this.snowflakeIcebergType);
sb.append(")");
}
return sb;
}

/**
* Method does not modify, delete any existing nodes and its types, names. It is meant only to add
* new children.
Expand Down Expand Up @@ -170,26 +190,6 @@ private IcebergFieldNode fromNestedField(Types.NestedField field) {
return new IcebergFieldNode(field.name(), field.type());
}

/**
* @param sb StringBuilder
* @param parentType Snowflake Iceberg table compatible type. If a root node is a parent then
* "ROOT_NODE" is passed, because we always generate root nodes column name.
* @return field name + data type
*/
StringBuilder buildQuery(StringBuilder sb, String parentType) {
if (parentType.equals("ARRAY") || parentType.equals("MAP") || parentType.equals("ROOT_NODE")) {
sb.append(snowflakeIcebergType);
} else {
appendNameAndType(sb);
}
if (!children.isEmpty()) {
sb.append("(");
appendChildren(sb, this.snowflakeIcebergType);
sb.append(")");
}
return sb;
}

private void appendNameAndType(StringBuilder sb) {
sb.append(name);
sb.append(" ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public void evolveSchemaIfNeeded(
});
Optional<String> alterSetDataTypeQuery = alterSetDataTypeQuery(alreadyExistingColumns);
try {
conn.appendColumnsToIcebergTable(tableName, addColumnsQuery, alterSetDataTypeQuery);
addColumnsQuery.ifPresent(query -> conn.evolveIcebergColumns(tableName, query));
alterSetDataTypeQuery.ifPresent(query -> conn.evolveIcebergColumns(tableName, query));
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.warn(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.TableSchemaResolver;
import java.util.List;
import java.util.Map;

import net.snowflake.ingest.streaming.internal.ColumnProperties;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -45,7 +44,10 @@ public SnowflakeSchemaEvolutionService(SnowflakeConnectionService conn) {
* @param schemaAlreadyInUse is unused in this implementation
*/
@Override
public void evolveSchemaIfNeeded(SchemaEvolutionTargetItems targetItems, SinkRecord record, Map<String, ColumnProperties> schemaAlreadyInUse) {
public void evolveSchemaIfNeeded(
SchemaEvolutionTargetItems targetItems,
SinkRecord record,
Map<String, ColumnProperties> schemaAlreadyInUse) {
String tableName = targetItems.getTableName();
List<String> columnsToDropNullability = targetItems.getColumnsToDropNonNullability();
// Update nullability if needed, ignore any exceptions since other task might be succeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergSchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelCreation;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class IcebergColumnTypeMapperTest {
@MethodSource("kafkaTypesToMap")
void shouldMapKafkaTypeToSnowflakeColumnType(
Schema.Type kafkaType, String schemaName, String expectedSnowflakeType) {
assertThat(mapper.mapToColumnTypeFromKafkaSchema(kafkaType, schemaName)).isEqualTo(expectedSnowflakeType);
assertThat(mapper.mapToColumnTypeFromKafkaSchema(kafkaType, schemaName))
.isEqualTo(expectedSnowflakeType);
}

@ParameterizedTest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ static Stream<Arguments> parseFromJsonArguments() {
"ARRAY(OBJECT(name VARCHAR, id LONG))"),
// array
arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)"),
arguments("{ \"testColumnName\": [] }", "ARRAY(VARCHAR(16777216))")
);
arguments("{ \"testColumnName\": [] }", "ARRAY(VARCHAR(16777216))"));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.DescribeTableRow;
import com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
Expand All @@ -13,6 +12,7 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -80,7 +80,7 @@ private static Stream<Arguments> prepareData() {

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
// @Disabled
@Disabled
void shouldInsertRecords(String description, String message, boolean withSchema)
throws Exception {
service.insert(
Expand All @@ -90,7 +90,6 @@ void shouldInsertRecords(String description, String message, boolean withSchema)
service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema)));
waitForOffset(3);

List<DescribeTableRow> columns = describeTable(tableName);
assertRecordsInTable();
}

Expand Down

0 comments on commit da4bbf9

Please sign in to comment.