-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1729292 modify iceberg tree based on record data #1007
SNOW-1729292 modify iceberg tree based on record data #1007
Conversation
...e/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
Outdated
Show resolved
Hide resolved
.../snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java
Outdated
Show resolved
Hide resolved
...lake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
Show resolved
Hide resolved
...lake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
Show resolved
Hide resolved
...lake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
Outdated
Show resolved
Hide resolved
...m/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java
Outdated
Show resolved
Hide resolved
...m/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java
Outdated
Show resolved
Hide resolved
...m/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java
Show resolved
Hide resolved
|
||
public IcebergSchemaEvolutionService(SnowflakeConnectionService conn) { | ||
this.conn = conn; | ||
this.tableSchemaResolver = new IcebergTableSchemaResolver(); | ||
this.icebergTableSchemaResolver = new IcebergTableSchemaResolver(); | ||
} | ||
|
||
@VisibleForTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it is not used anymore. You can safely delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused. I use icebergTableSchemaResolver
a couple of times. (?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the constructor below marked with @VisibleForTesting
(not affected by your changes).
...e/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java
Show resolved
Hide resolved
.../java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java
Outdated
Show resolved
Hide resolved
import org.junit.jupiter.params.provider.Arguments; | ||
import org.junit.jupiter.params.provider.MethodSource; | ||
|
||
public class ParseIcebergColumnTreeTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was moved to a correct module.
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java
Show resolved
Hide resolved
...com/snowflake/kafka/connector/internal/streaming/schemaevolution/SchemaEvolutionService.java
Show resolved
Hide resolved
return new IcebergColumnTree(rootNode); | ||
} | ||
|
||
IcebergColumnTree fromConnectSchema(Field kafkaConnectField) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two separated flows: JSON without schema and AVRO/Protobuf.
I would extract fromConnectSchema
to a separate class SchematizedIcebergColumnTreeFactory
and rename this class to NoSchemaIcebergColumnTreeFactory
. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so what we have:
- from Iceberg (channel) schema - it's used before both workflows
- from json payload - workflow without schema
- from record schema - now there is only fromConnectSchema method. Do we have to also write seperate logic to parse avro and protobuf schema? I think not because Converter will parse it into connect schema when we ancounter AVRO or Protobuf. (For sure we ust test it)
When I wrote the factory, I thought it will be a bit over engineering to split it. However if we are going to need more methods then sure. Logically we have 3 parts.
...kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeTypeBuilder.java
Outdated
Show resolved
Hide resolved
List<IcebergColumnTree> modifiedOrAddedColumns = | ||
icebergTableSchemaResolver.resolveIcebergSchemaFromRecord(record, columnsToEvolve); | ||
|
||
List<IcebergColumnTree> columnsToAdd = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to iterate twice over the list. The columns from modifiedOrAddedColumns
are either modified or added, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Righ, however I found my approach simpler and didn't care very much about performance of that.
Create 2 lists and write and if to put an element into one or another list.
private Pair<List<IcebergColumnTree>, List<IcebergColumnTree>> distinguish(
List<IcebergColumnTree> alreadyExistingColumns,
List<IcebergColumnTree> modifiedOrAddedColumns) {
ArrayList<IcebergColumnTree> columnsToModify = new ArrayList<>();
ArrayList<IcebergColumnTree> columnsToAdd = new ArrayList<>();
for (IcebergColumnTree tree : modifiedOrAddedColumns) {
if (alreadyExistingColumns.stream()
.anyMatch(alreadyExisting -> alreadyExisting.getColumnName()
.equalsIgnoreCase(tree.getColumnName()))) {
columnsToModify.add(tree);
} else {
columnsToAdd.add(tree);
}
}
return Pair<List<IcebergColumnTree>, List<IcebergColumnTree>>(columnsToAdd, columnsToModify);
}
But there is not "Pair" in JDK8.
This approach is a bit more messy. Slightly more performant.
(It's messy ->) The second alternative I see is to distinguish columnsToAdd, and then having columnsToAdd iterate over columns to evolve and again match them with modifiedOrAddedColumns list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None is perfect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me it's a matter of avoiding code duplication rather than optimizing the code execution time. I don't see any problem with creating a wrapper class for two lists.
Anyway it's not a blocker to me.
.filter( | ||
modifiedOrAddedColumn -> | ||
alreadyExistingColumns.stream() | ||
.noneMatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be easier to convert alreadyExistingColumns
from List to Set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can use Set everywhere instead of a list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But for IcebergColumnTree I didn't implement equals nor hashCode. I don't feel it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Is there any problem with implementing these methods or the final code doesn't look cleaner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we implement only for columnName - there is not a problem. Using it for both doesn't make sense.
I don't think using Set will change anything in a logic. It may (shouldn't) sneakily replace a column somewhere... I can change it, never say never.
...afka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java
Outdated
Show resolved
Hide resolved
...e/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java
Show resolved
Hide resolved
...e/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java
Outdated
Show resolved
Hide resolved
...e/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java
Outdated
Show resolved
Hide resolved
...e/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java
Outdated
Show resolved
Hide resolved
...e/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java
Outdated
Show resolved
Hide resolved
...e/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java
Outdated
Show resolved
Hide resolved
+ "}"; | ||
} | ||
|
||
static String nestedObjectsPayload = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep it consise with others. I don't want to restrict it's usage.
src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java
Outdated
Show resolved
Hide resolved
false)); | ||
} | ||
|
||
private static final String RECORD_METADATA_TYPE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move it at the beginning of the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still see some space for minor improvements, but I am ok with merging it at this point.
@@ -51,6 +49,8 @@ public void setUp() { | |||
config.put(ICEBERG_ENABLED, "TRUE"); | |||
config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString()); | |||
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true"); | |||
// "snowflake.streaming.max.client.lag" = 1 second, for faster tests | |||
config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking much better now, really like how you split the IcebergFieldNode class. Good job!
Overview
SNOW-1729292
There is still a lot work to be done. Tell me if you find this approach good.
The whole logic responsible for iceberg schema evolution is package private. Only IcebergSchemaEvolutionService is public.
It doesn't work with ingest-sdk 3.0.0, local jar needs to be build from ingest-sdk master.
TODO:
Pre-review checklist
snowflake.ingestion.method
.Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected