-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable failure recovery for Iceberg connector #10622
Enable failure recovery for Iceberg connector #10622
Conversation
14f2a06
to
f4cb94d
Compare
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java
Outdated
Show resolved
Hide resolved
return queryRunner; | ||
} | ||
catch (Exception e) { | ||
queryRunner.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeSuppressing
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; | ||
|
||
public class TestIcebergFailureRecovery | ||
extends AbstractTestFailureRecovery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AbstractTest
is an old naming convention, these days we call them Base...Test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted to #10659
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFailureRecovery.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFailureRecovery.java
Outdated
Show resolved
Hide resolved
assertThatThrownBy(() -> { | ||
testTableModification( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like you're working against testTableModification
abstraction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - I guess we do not need this test at all here. It would make sense to document behaviour difference for Iceberg if test was part of superclass.
I will drop for now, but it should be moved to superclass and we should start using TestingConnectorBehavior
in super class
Optional.of("CREATE TABLE <table> WITH (partitioning = ARRAY['p']) AS SELECT *, 0 p FROM orders"), | ||
"DELETE FROM <table> WHERE p = (SELECT min(nationkey) FROM nation)", | ||
Optional.of("DROP TABLE <table>")); | ||
}).hasMessageContaining("This connector only supports delete where one or more identity-transformed partitions are deleted entirely"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline before .has...
// This test does is not connector specific and should be moved out of AbstractTestFailureRecovery | ||
throw new SkipException("not relevant for Iceberg"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: grammar is off
why not enable it for now here?
when is it going to be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add it for now here. As extracting it out from Base class requires significant restructuring.
As for timeline: 🤷
While i agree we should implement such a procedure, failure recovery should not leave garbage behind, in situations when this can be avoided. Iceberg writes files with random names. The file name is determined here trino/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java Line 305 in 863da2b
we should consider
cc @phd3 @alexjo2144 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current state of the PR - LGTM % comments.
#10622 (comment) fits well as a followup.
25bcc15
to
b36010f
Compare
@findepi I added some code for that. PTAL and tell me what you think. |
b36010f
to
53a739a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Cleanup extranous output files in Iceberg DML"
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
Outdated
Show resolved
Hide resolved
@@ -88,4 +91,10 @@ public DataSize getMaxScannedFileSize() | |||
{ | |||
return maxScannedFileSize; | |||
} | |||
|
|||
@JsonProperty | |||
public boolean isRetriesEnabled() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could the RetryMode be provided by the engine to the finish*()
methods, so that we don't have to embed the info in handles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could - but I would opt for what we do now. It feels more natural - as we know information upfront. Also it allows for rejecting the request sooner if given retry mode is not supported by a connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allows for rejecting the request sooner if given retry mode is not supported by a connector.
i did not suggest not to provide it to begin methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be asymetric vs other "stuff" we pass to "begin*" methods (layout, list of columns). I will leave it as is.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
|
||
private boolean isFileCreatedByQuery(String fileName, String queryId) | ||
{ | ||
return fileName.startsWith(queryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a comment that one query id be cannot be a prefix of another query id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good point actually.
Changed to
verify(!queryId.contains("-"), "queryId(%s) should not contain hyphens", queryId);
return fileName.startsWith(queryId + "-");
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java
Outdated
Show resolved
Hide resolved
53a739a
to
018609c
Compare
With query/task retries there is a chance that extra files, which does not make it to the snapshot are written to tables directory. While most of such cases should be cleaned up by writers on workers, there is a slim channce that some of those will survive query exection (e.g. if worker machine is killed). This commit adds pre-commit routine on coordinator which deletes what remained. This is still opportunistic and not 100% sure to delete everything as extra files may still be written after cleanup routine already completed, but we are trying our best. The remaining files does not imply query correctness.
018609c
to
955d243
Compare
CI flake: #10631 |
Caveat:
With current logic it is possible that data file written during DML operation, which in the end is not part of the committed table snapshot, remains in the table directory on the distributed filesystem.
It should be a rare situation.
Orphaned files can be clean via
remove_orphan_files
routing using Spark (https://iceberg.apache.org/#spark-procedures/).Eventuall we want to have similar routine in Trino (#10623)
fixes: #10253