Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Fix rewrite_position_deletes for certain partition types #8059

Merged
merged 5 commits into from
Jul 14, 2023

Conversation

szehon-ho
Copy link
Collaborator

@szehon-ho szehon-ho commented Jul 13, 2023

Fixes: #8045

This error is from the code that does 'removeDanglingDeletes'.

In this code, for each partition of a group of delete files, I am trying to find 'live' data files so I can find which deletes no longer apply (dangling) and can clean up.

The issue is, I use the DeleteFile's partition values directly to query data_files table. I thought it would work as the data_files table is using the transformed partition values, just as the DeleteFile partition values should have.

But the partition data of DeleteFile is not the same type as exposed in the Spark metadata table.

The fix is to convert into something that can be used for Spark query.

import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;

public class TestRewritePositionDeleteFiles extends SparkExtensionsTestBase {
Copy link
Collaborator Author

@szehon-ho szehon-ho Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes:

  1. I can probably unite the other TestRewritePositionDeleteFilesAction into this one later. That one was not in project with sql-extensions, and I find it a lot easier to populate tables with sql-support, instead of making new java class for each type of partition I am trying to test.

  2. This tests all the partition types that may error in : https://iceberg.apache.org/spec/#avro, in regards to removeDanglingDeletes query. Note some of these types do not exist in spark (UUID, time)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore point 2, as after discussion with @aokolnychyi its not really avro but just Iceberg type in general that need conversion

@szehon-ho szehon-ho added this to the Iceberg 1.3.1 milestone Jul 13, 2023
* @return object converted to its logical type
*/
private Object convertPartitionValue(Object value, Type.TypeID typeId) {
switch (typeId) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand there is a type mismatch but don't we have to convert internal Iceberg values for a given type to a Spark value? Why Avro?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are getting back Iceberg literals we need to come up with Spark literals. We have PartitionData under the hood that is populated by Avro but that should not matter at all. We need to build an reverser conversion similar to what we already have in SparkValueConverter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, makes sense, its not really to do with Avro, but indeed we need an Iceberg => Spark direction, just as we have a Spark => Iceberg direction in SparkValueConverter. Added that in latest commit.

@szehon-ho
Copy link
Collaborator Author

@aokolnychyi added a test with partition transform (as discussed offline )

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation seems correct. The redundant comments should be removed and we need to double check nulls in filters.

Column col = col("partition." + fields.get(i).name());
return col.equalTo(value);
return col.equalTo(lit(converted));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work correctly when the partition value is null? Do we have to use eqNullSafe?

Copy link
Collaborator Author

@szehon-ho szehon-ho Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, done, and added test (manually verified that we get into partition-pruning ManifestEvaluator isNull block)

Copy link
Collaborator Author

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, addressed comments

List<DeleteFile> rewrittenDeletes,
List<DeleteFile> newDeletes,
int expectedGroups) {
Assert.assertEquals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg is currently moving away from Junit4 to Junit5 and in the same step is updating assertions to AssertJ. It would be great to adjust all of these assertions here to AssertJ, as otherwise it'll add additional work later on

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Eduard has a comment and should be good to go. I am OK with addressing that in a separate change if this blocks the release. If we still have time, it would be great to fix it now. Up to you, @szehon-ho.

@szehon-ho
Copy link
Collaborator Author

Sure let me do this in follow up pr.

@szehon-ho szehon-ho merged commit 25dc421 into apache:master Jul 14, 2023
31 checks passed
laithalzyoud added a commit to revolut-engineering/iceberg that referenced this pull request Jan 30, 2024
* Hive: Set commit state as Unknown before throwing CommitStateUnknownException (apache#7931) (apache#8029)

* Spark 3.4: WAP branch not propagated when using DELETE without WHERE (apache#7900) (apache#8028)

* Core: Include all reachable snapshots with v1 format and REF snapshot mode (apache#7621) (apache#8027)

* Spark 3.3: Backport 'WAP branch not propagated when using DELETE without WHERE' (apache#8033) (apache#8036)

* Flink: remove the creation of default database in FlinkCatalog open method (apache#7795) (apache#8039)

* Core: Handle optional fields (apache#8050) (apache#8064)

* Core: Handle allow optional fields

We expect:

- current-snapshot-id
- properties
- snapshots

to be there, but they are actually optional.

* Use AssertJ

* Core: Abort file groups should be under same lock as committerService (apache#7933) (apache#8060)

* Spark 3.4: Fix rewrite_position_deletes for certain partition types (apache#8059)

* Spark 3.3: Fix rewrite_position_deletes for certain partition types (apache#8059) (apache#8069)

* Spark: Add actions for disater recovery.

* Fix the compile error.

* Fix merge conflicts and formatting

* All tests are working and code integrated with Spark 3.3

* Fix union error and snapshots test

* Fix Spark broadcast error

* Add RewritePositionDeleteFilesSparkAction

---------

Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com>
Co-authored-by: Fokko Driesprong <fokko@apache.org>
Co-authored-by: Xianyang Liu <liu-xianyang@hotmail.com>
Co-authored-by: Szehon Ho <szehon.apache@gmail.com>
Co-authored-by: Yufei Gu <yufei_gu@apple.com>
Co-authored-by: yufeigu <yufei@apache.org>
Co-authored-by: Laith Alzyoud <laith.alzyoud@revolut.com>
Co-authored-by: vaultah <4944562+vaultah@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

rewrite_position_delete_files leads to error
3 participants