Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1729292 modify iceberg tree based on record data #1007

Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ccc6bf3
SNOW-1729292 Modify the tree based on record data
sfc-gh-bzabek Nov 18, 2024
c1d1703
parse iceberg chemas and distinguish columns
sfc-gh-bzabek Nov 19, 2024
bebfeef
refactor and encapsulate classes
sfc-gh-bzabek Nov 19, 2024
9bf7180
revert it
sfc-gh-bzabek Nov 19, 2024
c8ae000
can alter STRUCT column
sfc-gh-bzabek Nov 19, 2024
711e221
test to verify we can evolve advanced structures
sfc-gh-bzabek Nov 19, 2024
4ec9c33
refactor test for structures inserted for the first time
sfc-gh-bzabek Nov 19, 2024
e6b8c06
implement merge tree logic
sfc-gh-bzabek Nov 20, 2024
6f9f2fa
first succesful evolution of STRUCTURE
sfc-gh-bzabek Nov 20, 2024
a1adeb6
refactor methods for generating queries
sfc-gh-bzabek Nov 21, 2024
3dd80cb
refactor, remove ApacheIcebergSchema class
sfc-gh-bzabek Nov 21, 2024
b924e3a
remove ingest-sdk jar
sfc-gh-bzabek Nov 21, 2024
f8dc4a0
nit
sfc-gh-bzabek Nov 21, 2024
1321df6
javadoc
sfc-gh-bzabek Nov 21, 2024
9110f6a
nits
sfc-gh-bzabek Nov 21, 2024
eab2354
handle ARRAY evolution
sfc-gh-bzabek Nov 22, 2024
660a15f
fix NPE for array: null
sfc-gh-bzabek Nov 26, 2024
69de2aa
support records with schema
sfc-gh-bzabek Nov 26, 2024
6144419
use optional instead of empty string after generating a query
sfc-gh-bzabek Nov 26, 2024
4f37bee
make IcebergSchemaEvolutionService implement interface again
sfc-gh-bzabek Nov 26, 2024
5f00f8f
refactor ConnectionServiceV1
sfc-gh-bzabek Nov 26, 2024
da4bbf9
refactor the way we execute query, plus format.sh
sfc-gh-bzabek Nov 26, 2024
36f6f02
add more tests for scenarios with schema, fix schema resolver
sfc-gh-bzabek Nov 27, 2024
b1108f8
add and parametrize IT tests
sfc-gh-bzabek Nov 27, 2024
2cac1db
minor tests rework
sfc-gh-bzabek Nov 28, 2024
8d9cbf6
refactor SchemaEvolutionService
sfc-gh-bzabek Nov 28, 2024
49cabc9
rename ApacheIcebergColumnSchema to IcebergColumnSchema
sfc-gh-bzabek Nov 28, 2024
08c103c
generate query in a connection service
sfc-gh-bzabek Nov 28, 2024
7ff97e7
nit: change order of the methods
sfc-gh-bzabek Nov 28, 2024
665dbb8
detach create and merge node logic to services
sfc-gh-bzabek Nov 29, 2024
5d5df77
detach buildType logic into a service
sfc-gh-bzabek Nov 29, 2024
3666b25
self review improvements
sfc-gh-bzabek Nov 29, 2024
f918bd0
apply changes suggested in a review
sfc-gh-bzabek Dec 2, 2024
7ff742a
disable tests
sfc-gh-bzabek Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,19 @@ public interface SnowflakeConnectionService {
*/
void appendColumnsToTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

/**
* Alter iceberg table to modify columns datatype
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their infos
*/
void alterColumnsDataTypeIcebergTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

/**
* Alter iceberg table to add columns according to a map from columnNames to their types
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their columnInfos
* @param columnInfosMap the mapping from the columnNames to their infos
*/
void appendColumnsToIcebergTable(String tableName, Map<String, ColumnInfos> columnInfosMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,36 @@ public boolean hasSchemaEvolutionPermission(String tableName, String role) {
return hasPermission;
}

/**
* Alter iceberg table to modify columns datatype
*
* @param tableName the name of the table
* @param columnInfosMap the mapping from the columnNames to their infos
*/
@Override
public void alterColumnsDataTypeIcebergTable(
String tableName, Map<String, ColumnInfos> columnInfosMap) {
LOGGER.debug("Modifying data types of iceberg table columns");
String alterSetDatatypeQuery = generateAlterSetDataTypeQuery(columnInfosMap);
executeStatement(tableName, alterSetDatatypeQuery);
}

private String generateAlterSetDataTypeQuery(Map<String, ColumnInfos> columnsToModify) {
StringBuilder setDataTypeQuery = new StringBuilder("alter iceberg ");
setDataTypeQuery.append("table identifier(?) alter column ");
for (Map.Entry<String, ColumnInfos> column : columnsToModify.entrySet()) {
String columnName = column.getKey();
String dataType = column.getValue().getColumnType();

setDataTypeQuery.append(columnName).append(" set data type ").append(dataType).append(", ");
}
// remove last comma and whitespace
setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1);
sfc-gh-bzabek marked this conversation as resolved.
Show resolved Hide resolved
setDataTypeQuery.deleteCharAt(setDataTypeQuery.length() - 1);

return setDataTypeQuery.toString();
}

/**
* Alter table to add columns according to a map from columnNames to their types
*
Expand All @@ -523,6 +553,18 @@ public void appendColumnsToIcebergTable(
appendColumnsToTable(tableName, columnInfosMap, true);
}

private void executeStatement(String tableName, String query) {
try {
LOGGER.info("Trying to run query: {}", query);
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2015.getException(e);
}
}

private void appendColumnsToTable(
String tableName, Map<String, ColumnInfos> columnInfosMap, boolean isIcebergTable) {
checkConnection();
Expand Down Expand Up @@ -552,15 +594,7 @@ private void appendColumnsToTable(
logColumn.append(columnName).append(" (").append(columnInfosMap.get(columnName)).append(")");
}

try {
LOGGER.info("Trying to run query: {}", appendColumnQuery.toString());
PreparedStatement stmt = conn.prepareStatement(appendColumnQuery.toString());
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
throw SnowflakeErrors.ERROR_2015.getException(e);
}
executeStatement(tableName, appendColumnQuery.toString());

logColumn.insert(0, "Following columns created for table {}:\n").append("]");
sfc-gh-mbobowski marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info(logColumn.toString(), tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ public InsertRowsResponse get() throws Throwable {
LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx));
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx),
channel.getTableSchema());
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.*;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -539,7 +536,8 @@ private void handleInsertRowFailure(
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName());
if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
schemaEvolutionService.evolveSchemaIfNeeded(schemaEvolutionTargetItems, kafkaSinkRecord);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution;

import java.util.Map;
import net.snowflake.ingest.streaming.internal.ColumnProperties;
import org.apache.kafka.connect.sink.SinkRecord;

public interface SchemaEvolutionService {
Expand All @@ -12,5 +14,8 @@ public interface SchemaEvolutionService {
* nullability, and columns to add
* @param record the sink record that contains the schema and actual data
*/
void evolveSchemaIfNeeded(SchemaEvolutionTargetItems targetItems, SinkRecord record);
void evolveSchemaIfNeeded(
SchemaEvolutionTargetItems targetItems,
SinkRecord record,
Map<String, ColumnProperties> existingSchema);
sfc-gh-mbobowski marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;

class IcebergColumnJsonValuePair {
private final String columnName;
private final JsonNode jsonNode;

static IcebergColumnJsonValuePair from(Map.Entry<String, JsonNode> field) {
return new IcebergColumnJsonValuePair(field.getKey(), field.getValue());
}

IcebergColumnJsonValuePair(String columnName, JsonNode jsonNode) {
this.columnName = columnName;
this.jsonNode = jsonNode;
}

String getColumnName() {
return columnName;
}

JsonNode getJsonNode() {
return jsonNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
import org.apache.iceberg.types.Type;

/** Wrapper class for Iceberg schema retrieved from channel. */
public class ApacheIcebergColumnSchema {
class IcebergColumnSchema {

private final Type schema;

private final String columnName;

public ApacheIcebergColumnSchema(Type schema, String columnName) {
IcebergColumnSchema(Type schema, String columnName) {
this.schema = schema;
this.columnName = columnName.toUpperCase();
this.columnName = columnName;
}

public Type getSchema() {
Type getSchema() {
return schema;
}

public String getColumnName() {
String getColumnName() {
return columnName;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

/** Class with object types compatible with Snowflake Iceberg table */
public class IcebergColumnTree {
class IcebergColumnTree {

private final IcebergFieldNode rootNode;

public IcebergColumnTree(ApacheIcebergColumnSchema columnSchema) {
this.rootNode = new IcebergFieldNode(columnSchema.getColumnName(), columnSchema.getSchema());
String getColumnName() {
return rootNode.name;
}

public String buildQuery() {
StringBuilder sb = new StringBuilder();
return rootNode.buildQuery(sb, "ROOT_NODE").toString();
IcebergFieldNode getRootNode() {
return rootNode;
}

IcebergColumnTree(IcebergFieldNode rootNode) {
this.rootNode = rootNode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;

public class IcebergColumnTreeFactory {

private final KCLogger LOGGER = new KCLogger(IcebergColumnTreeFactory.class.getName());

private final IcebergColumnTypeMapper mapper;

public IcebergColumnTreeFactory() {
this.mapper = new IcebergColumnTypeMapper();
}

IcebergColumnTree fromIcebergSchema(IcebergColumnSchema columnSchema) {
LOGGER.debug("Attempting to resolve schema from schema stored in a channel");
IcebergFieldNode rootNode =
createNode(columnSchema.getColumnName().toUpperCase(), columnSchema.getSchema());
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromJson(IcebergColumnJsonValuePair pair) {
LOGGER.debug("Attempting to resolve schema from records payload");
IcebergFieldNode rootNode = createNode(pair.getColumnName().toUpperCase(), pair.getJsonNode());
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromConnectSchema(Field kafkaConnectField) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two separated flows: JSON without schema and AVRO/Protobuf.

I would extract fromConnectSchema to a separate class SchematizedIcebergColumnTreeFactory and rename this class to NoSchemaIcebergColumnTreeFactory. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so what we have:

  • from Iceberg (channel) schema - it's used before both workflows
  • from json payload - workflow without schema
  • from record schema - now there is only fromConnectSchema method. Do we have to also write seperate logic to parse avro and protobuf schema? I think not because Converter will parse it into connect schema when we ancounter AVRO or Protobuf. (For sure we ust test it)

When I wrote the factory, I thought it will be a bit over engineering to split it. However if we are going to need more methods then sure. Logically we have 3 parts.

LOGGER.debug("Attempting to resolve schema from schema attached to a record");
IcebergFieldNode rootNode =
createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema());
return new IcebergColumnTree(rootNode);
}

// -- parse tree from Iceberg schema logic --
private IcebergFieldNode createNode(String name, Type apacheIcebergSchema) {
String snowflakeType = mapper.mapToColumnTypeFromIcebergSchema(apacheIcebergSchema);
return new IcebergFieldNode(name, snowflakeType, produceChildren(apacheIcebergSchema));
}

private LinkedHashMap<String, IcebergFieldNode> produceChildren(Type apacheIcebergSchema) {
// primitives must not have children
if (apacheIcebergSchema.isPrimitiveType()) {
return new LinkedHashMap<>();
}
Type.NestedType nestedField = apacheIcebergSchema.asNestedType();
return nestedField.fields().stream()
.collect(
Collectors.toMap(
Types.NestedField::name,
field -> createNode(field.name(), field.type()),
// It's impossible to have two same keys
(v1, v2) -> {
throw new IllegalArgumentException("Two same keys: " + v1);
},
LinkedHashMap::new));
}

// -- parse tree from kafka record payload logic --
private IcebergFieldNode createNode(String name, JsonNode jsonNode) {
String snowflakeType = mapper.mapToColumnTypeFromJson(jsonNode);
return new IcebergFieldNode(name, snowflakeType, produceChildren(jsonNode));
}

private LinkedHashMap<String, IcebergFieldNode> produceChildren(JsonNode recordNode) {
if (recordNode.isNull()) {
return new LinkedHashMap<>();
}
if (recordNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) recordNode;
return produceChildrenFromArray(arrayNode);
}
if (recordNode.isObject()) {
ObjectNode objectNode = (ObjectNode) recordNode;
return produceChildrenFromObject(objectNode);
}
return new LinkedHashMap<>();
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenFromArray(ArrayNode arrayNode) {
JsonNode arrayElement = arrayNode.get(0);
// VARCHAR is set for an empty array: [] -> ARRAY(VARCHAR)
if (arrayElement == null) {
LinkedHashMap<String, IcebergFieldNode> child = new LinkedHashMap<>();
child.put(
"element", new IcebergFieldNode("element", "VARCHAR(16777216)", new LinkedHashMap<>()));
return child;
}
LinkedHashMap<String, IcebergFieldNode> child = new LinkedHashMap<>();
child.put("element", createNode("element", arrayElement));
return child;
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenFromObject(ObjectNode objectNode) {
return objectNode.properties().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
stringJsonNodeEntry ->
createNode(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue()),
(v1, v2) -> {
throw new IllegalArgumentException("Two same keys: " + v1);
},
LinkedHashMap::new));
}

// -- parse tree from kafka record schema logic --
private IcebergFieldNode createNode(String name, Schema schema) {
String snowflakeType =
mapper.mapToColumnTypeFromKafkaSchema(schema.schema().type(), schema.schema().name());
return new IcebergFieldNode(name, snowflakeType, produceChildren(schema.schema()));
}

private LinkedHashMap<String, IcebergFieldNode> produceChildren(Schema connectSchema) {
if (connectSchema.type() == Schema.Type.STRUCT) {
return produceChildrenFromStruct(connectSchema);
}
if (connectSchema.type() == Schema.Type.MAP) {
return produceChildrenFromMap(connectSchema);
}
if (connectSchema.type() == Schema.Type.ARRAY) {
return produceChildrenForArray(connectSchema);
} else { // isPrimitive == true
return new LinkedHashMap<>();
}
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenForArray(
Schema connectSchemaForArray) {
LinkedHashMap<String, IcebergFieldNode> child = new LinkedHashMap<>();
child.put("element", createNode("element", connectSchemaForArray.valueSchema()));
return child;
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenFromStruct(Schema connectSchema) {
return connectSchema.fields().stream()
.collect(
Collectors.toMap(
Field::name,
f -> createNode(f.name(), f.schema()),
(v1, v2) -> {
throw new IllegalArgumentException("Two same keys: " + v1);
},
LinkedHashMap::new));
}

private LinkedHashMap<String, IcebergFieldNode> produceChildrenFromMap(Schema connectSchema) {
LinkedHashMap<String, IcebergFieldNode> keyValue = new LinkedHashMap<>();
// these names will not be used when creating a query
keyValue.put("key", createNode("key", connectSchema.keySchema()));
keyValue.put("value", createNode("value", connectSchema.valueSchema()));
return keyValue;
}
}
Loading
Loading