-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Added support to write iceberg tables #5989
base: main
Are you sure you want to change the base?
Conversation
2a60cf8
to
8c81883
Compare
properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName()); | ||
properties.put(CatalogProperties.URI, catalogURI); | ||
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); | ||
|
||
// Following is needed to write new manifest files when writing new data. | ||
// Not setting this will result in using ResolvingFileIO. | ||
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it a problem to use ResolvingFileIO
? You will need to provide HadoopConf
info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So from what I understood, ResolvingFileIO
would add additional step of resolving which file IO to use.
And based on the file name, I thought we are sure here that its in S3. That's why I thought of using S3FileIO. Does that sound reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your point, we clearly know that this should resolve to S3FileIO
. In all other scenarios, though, we've trusted the Iceberg API to resolve correctly and I'd be happier to stick with that.
I don't feel strongly about this however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this context, I'm happy to have specify S3FileIO specified. I think in general though, we are leaning away from providing these "pre-configured" entrypoints for the user, and prefer they go through the generic catalog creation? In which case I would argue that IcebergToolsS3
we might want to deprecate.
extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
* @param dhTable The deephaven table to append | ||
* @param instructions The instructions for customizations while writing, or null to use default instructions | ||
*/ | ||
public void append( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We aren't currently providing a way to add partitioned data to an Iceberg table, but we should create a ticket for this functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will check with Ryan/Devin if I should add this as part of this PR itself, or should start a separate ticket/PR.
If we decide to do it here, I would need a bit more clarity on the API.
The main difference from non-partitioned writing is providing a set of partition values to which a particular data file will belong to. Iceberg provides a few ways to specify this information for a new data file (reference):
withPartition
: Accept aorg.apache.iceberg.StructLike
instance on which it can callget
to access different partition values.withPartitionPath
: ProvideString newPartitionPath
which it splits based on the partition spec and=
and/
characters.withPartitionValues
: Provide aList<String> partitionValues
.
We would need to decide what to accept from the user, and finalize on the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we support reading partitioned Iceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we do. cc: @lbooker42
private static void verifyAppendCompatibility( | ||
final Schema icebergSchema, | ||
final TableDefinition tableDefinition) { | ||
// Check that all columns in the table definition are part of the Iceberg schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it an error to write a table with extra columns not in Iceberg schema? Or should we only write the matching columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a iceberg instruction for verify compatability, if the user wants to verify that the data being appended/overwritten is compatible with the original table.
- For appending, we check that all required columns are present in the data with compatible types, and no extra columns outside of the schema are present.
- For overwriting, we check if the schema is identical.
If the user wants to override these checks, they can disable them through iceberg instructions.
I can add more details in the comments for the new iceberg instruction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the and no extra columns outside of the schema are present
test that concerns me. We will be requiring a user to dropColumns()
to meet this compatibility metric when I'm not sure it's important at all.
compatible
is pretty broad in definition (IMO), should not mean identical
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this can be restrictive. That is why I added an optional iceberg instructions so that user can disable validation if they are sure what they are adding.
Let me keep this thread open to see what everyone thinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the perspective of DH controlling the writing, I think we can be opinionated, and prefer to give the user less control than we might otherwise want / need to. I don't think it makes sense to allow the user to specify they want to write out Deephaven columns to the parquet file that aren't mapped to the Iceberg table. By default, it may be appropriate to always use the latest Schema at the time it is being written to, but I think we need to allow the user to pick the Schema they want to use for writing. IMO, the physical parquet columns we write should be a (non-strict) subset of that Schema's columns. If there is a map between a DH column and a Schema column, we write it to parquet; otherwise, we exclude it. This also means that every column we write out in this way has an Iceberg field_id we can map into parquet's field_id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, the physical parquet columns we write should be a (non-strict) subset of that Schema's columns.
I have something similar right now, along with an extra check that all the required columns from the schema should be there in the tables being appended.
extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java
Show resolved
Hide resolved
@@ -33,95 +36,18 @@ | |||
*/ | |||
public abstract class ParquetInstructions implements ColumnToCodecMappings { | |||
|
|||
private static volatile String defaultCompressionCodecName = CompressionCodecName.SNAPPY.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing unnecessary configuration parameters.
@@ -433,6 +382,14 @@ public boolean useDictionary() { | |||
public void useDictionary(final boolean useDictionary) { | |||
this.useDictionary = useDictionary; | |||
} | |||
|
|||
public OptionalInt getFieldId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field Id related logic may change when #6156 gets merged.
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName()); | ||
properties.put(CatalogProperties.URI, catalogURI); | ||
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); | ||
|
||
// Following is needed to write new manifest files when writing new data. | ||
// Not setting this will result in using ResolvingFileIO. | ||
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So from what I understood, ResolvingFileIO
would add additional step of resolving which file IO to use.
And based on the file name, I thought we are sure here that its in S3. That's why I thought of using S3FileIO. Does that sound reasonable?
extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructions.java
Outdated
Show resolved
Hide resolved
/** | ||
* A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg | ||
* data files. | ||
*/ | ||
public abstract Map<String, String> columnRenames(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** | ||
* A one-to-one {@link Map map} from Deephaven to Iceberg column names to use when writing deephaven tables to | ||
* Iceberg tables. | ||
*/ | ||
public abstract Map<String, String> dhToIcebergColumnRenames(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, this is also tied w/ a specific Schema. From a configuration point, I see the ease-of-use for using strings, but I wonder if we should have Map<NestedField, String>
, or Map<Integer, String> + Schema
(user can still use strings, but we materialize into this). I also suggest we model it with the iceberg data as the key, since it's the target system that is dictating uniqueness in this case (otherwise, we need to add a check that there are no duplicate values). Technically, we could support writing a single DH column to multiple iceberg columns, although I don't see a reason to offer that out of the gate without a clear use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea, although I would prefer to do this change as part of a separate PR for both reading and writing side together. For now, I have added a check for uniqueness for no duplicate values.
I can also link these comments in the issue #6124.
* The inverse map of {@link #dhToIcebergColumnRenames()}. | ||
*/ | ||
@Value.Lazy | ||
public Map<String, String> icebergToDhColumnRenames() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you do have the inverse, but I think this should be the source of the data, and not a view of it. I think we should also consider if we even want to provide as a public helper, or if it should be only for internal use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same response as #5989 (comment)
* A one-to-one {@link Map map} from Deephaven to Iceberg column names to use when writing deephaven tables to | ||
* Iceberg tables. | ||
*/ | ||
// TODO Please suggest better name for this method, on the read side its just called columnRenames |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pending TODO
.../iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructionsTest.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review of IcebergCatalogAdapter
. Looking pretty good so far!
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
public void overwrite( | ||
@NotNull final TableIdentifier tableIdentifier, | ||
@NotNull final Table[] dhTables, | ||
@Nullable final IcebergWriteInstructions instructions) { | ||
writeImpl(tableIdentifier, dhTables, instructions, true, true); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, basically, you're suggesting that the API revolve around immutable parameter structs with builders? It's probably marginally more annoying to users in the console, but let's us reduce overload spam.
We could also just get out of the business of taking POJO table identifiers (or Strings; standardize on one or the other) cutting overloads by half. (Shivam points out that this will be automatically addressed by interposing the TableAdapter layer.)
Table
args can just be a varargs list at the end of the method, cutting overloads again by half.
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
newNamespaceCreated = createNamespaceIfNotExists(tableIdentifier.namespace()); | ||
newSpecAndSchema = createSpecAndSchema(useDefinition, writeInstructions); | ||
icebergTable = createNewIcebergTable(tableIdentifier, newSpecAndSchema, writeInstructions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this stuff be done as a transaction? Possibly with removing/adding the data files, as well?
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
// Write the data to parquet files | ||
int count = 0; | ||
for (final Table dhTable : dhTables) { | ||
final String filename = String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought names came from the catalog. Table locations or whatever.
/** | ||
* Commit the changes to the Iceberg table by creating snapshots. | ||
*/ | ||
private static void commit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Placeholder for @rcaudy
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java
Outdated
Show resolved
Hide resolved
maximum_dictionary_size: Optional[int] = None, | ||
target_page_size: Optional[int] = None, | ||
verify_schema: Optional[bool] = None, | ||
dh_to_iceberg_column_renames: Optional[Dict[str, str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name is very long, especially if a user is specifying it. Any reason it can't just be column_renames
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should also look through the rest of the API to see if column_renames
or col_renames
would be most consistent. I would guess col_renames
.
if compression_codec_name is not None: | ||
builder.compressionCodecName(compression_codec_name) | ||
|
||
if maximum_dictionary_keys is not None: | ||
builder.maximumDictionaryKeys(maximum_dictionary_keys) | ||
|
||
if maximum_dictionary_size is not None: | ||
builder.maximumDictionarySize(maximum_dictionary_size) | ||
|
||
if target_page_size is not None: | ||
builder.targetPageSize(target_page_size) | ||
|
||
if verify_schema is not None: | ||
builder.verifySchema(verify_schema) | ||
|
||
if dh_to_iceberg_column_renames is not None: | ||
for dh_name, iceberg_name in dh_to_iceberg_column_renames.items(): | ||
builder.putDhToIcebergColumnRenames(dh_name, iceberg_name) | ||
|
||
if table_definition is not None: | ||
builder.tableDefinition(TableDefinition(table_definition).j_table_definition) | ||
|
||
if data_instructions is not None: | ||
builder.dataInstructions(data_instructions.j_object) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect all of these cases can have is not None
removed. Confirm with @jmao-denver on what he wants to see.
tables: List[Table], | ||
partition_paths: Optional[List[str]] = None, | ||
instructions: Optional[IcebergParquetWriteInstructions] = None): | ||
# TODO Review javadoc in this file once again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo
table_identifier: str, | ||
tables: List[Table], | ||
partition_paths: Optional[List[str]] = None, | ||
instructions: Optional[IcebergParquetWriteInstructions] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing a return type hint
instructions: Optional[IcebergParquetWriteInstructions] = None): | ||
# TODO Review javadoc in this file once again | ||
""" | ||
Append the provided Deephaven table as a new partition to the existing Iceberg table in a single snapshot. This |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this says "table" and "partition", but the input is a list of tables. Does that mean multiple tables go to one partition or multiple partitions? etc.
tables: List[Table], | ||
partition_paths: Optional[List[str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see other comments
table_identifier: str, | ||
tables: List[Table], | ||
partition_paths: Optional[List[str]] = None, | ||
instructions: Optional[IcebergParquetWriteInstructions] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing a return type hint
of data files that were written. Users can use this list to create a transaction/snapshot if needed. | ||
|
||
Args: | ||
table_identifier (str): the identifier string for iceberg table to write to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grammar
tables (List[Table]): the tables to write. | ||
partition_paths (Optional[List[str]]): the partitioning path at which data would be written, for example, | ||
"year=2021/month=01". If omitted, we will try to write data to the table without partitioning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see other comments
partition_paths (Optional[List[str]]): the partitioning path at which data would be written, for example, | ||
"year=2021/month=01". If omitted, we will try to write data to the table without partitioning. | ||
instructions (Optional[IcebergParquetWriteInstructions]): the instructions for customizations while writing. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All above cases that are missing the return type hint are also missing docs on the return value
Closes: #6125
Should be merged after #6156, #6268
Also moves existing Iceberg tests from Junit4 to Junit5.