Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable failure recovery for Iceberg connector #10622

Merged
merged 4 commits into from
Feb 14, 2022

Conversation

losipiuk
Copy link
Member

@losipiuk losipiuk commented Jan 14, 2022

Caveat:

With current logic it is possible that data file written during DML operation, which in the end is not part of the committed table snapshot, remains in the table directory on the distributed filesystem.
It should be a rare situation.

  • If task which writes data file fails unfinished file is deleted by tasks itself.
  • If there are two concurrently running attempts for a single task, when one completes, the other is killed. Yet killing is subject to race and if both tasks manage to complete successfully the extraneous file will remain in table directory.

Orphaned files can be clean via remove_orphan_files routing using Spark (https://iceberg.apache.org/#spark-procedures/).
Eventuall we want to have similar routine in Trino (#10623)

fixes: #10253

@cla-bot cla-bot bot added the cla-signed label Jan 14, 2022
@losipiuk losipiuk requested review from findepi and arhimondr January 14, 2022 17:27
@losipiuk losipiuk self-assigned this Jan 14, 2022
return queryRunner;
}
catch (Exception e) {
queryRunner.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closeSuppressing

import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

public class TestIcebergFailureRecovery
extends AbstractTestFailureRecovery
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbstractTest is an old naming convention, these days we call them Base...Test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted to #10659

Comment on lines 79 to 80
assertThatThrownBy(() -> {
testTableModification(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like you're working against testTableModification abstraction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I guess we do not need this test at all here. It would make sense to document behaviour difference for Iceberg if test was part of superclass.
I will drop for now, but it should be moved to superclass and we should start using TestingConnectorBehavior in super class

Optional.of("CREATE TABLE <table> WITH (partitioning = ARRAY['p']) AS SELECT *, 0 p FROM orders"),
"DELETE FROM <table> WHERE p = (SELECT min(nationkey) FROM nation)",
Optional.of("DROP TABLE <table>"));
}).hasMessageContaining("This connector only supports delete where one or more identity-transformed partitions are deleted entirely");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline before .has...

Comment on lines 90 to 91
// This test does is not connector specific and should be moved out of AbstractTestFailureRecovery
throw new SkipException("not relevant for Iceberg");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: grammar is off

why not enable it for now here?
when is it going to be removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add it for now here. As extracting it out from Base class requires significant restructuring.

As for timeline: 🤷

@findepi
Copy link
Member

findepi commented Jan 18, 2022

Orphaned files can be clean via remove_orphan_files routing using Spark (https://iceberg.apache.org/#spark-procedures/).
Eventuall we want to have similar routine in Trino (#10623)

While i agree we should implement such a procedure, failure recovery should not leave garbage behind, in situations when this can be avoided.

Iceberg writes files with random names. The file name is determined here

String fileName = fileFormat.addExtension(randomUUID().toString());

we should consider

  • add queryId to file name
  • in io.trino.plugin.iceberg.IcebergMetadata#finishInsert, etc, when failure recovery is enabled, go over table destination folder (as determined by LocationProvider) and find left over files
    • we could do this only if there were some failures (but currently finish... doesn't know that)
    • we could try to limit ourselves to directories where we write some files -- this way we wouldn't scan all the sub-directories of a partitioned table, but only ones where we insert data into
      • this will work when LocationProvider is deterministic. for non-deterministic LocationProvider we would need quite different approach (eg registering write locations before creating a file), as remove_orphan_files-like procedure might have problems as well

cc @phd3 @alexjo2144

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current state of the PR - LGTM % comments.

#10622 (comment) fits well as a followup.

@losipiuk losipiuk force-pushed the lo/test-iceberg-failure-recovery branch 3 times, most recently from 25bcc15 to b36010f Compare January 18, 2022 15:40
@losipiuk
Copy link
Member Author

Orphaned files can be clean via remove_orphan_files routing using Spark (https://iceberg.apache.org/#spark-procedures/).
Eventuall we want to have similar routine in Trino (#10623)

While i agree we should implement such a procedure, failure recovery should not leave garbage behind, in situations when this can be avoided.

Iceberg writes files with random names. The file name is determined here

String fileName = fileFormat.addExtension(randomUUID().toString());

we should consider

  • add queryId to file name

  • in io.trino.plugin.iceberg.IcebergMetadata#finishInsert, etc, when failure recovery is enabled, go over table destination folder (as determined by LocationProvider) and find left over files

    • we could do this only if there were some failures (but currently finish... doesn't know that)

    • we could try to limit ourselves to directories where we write some files -- this way we wouldn't scan all the sub-directories of a partitioned table, but only ones where we insert data into

      • this will work when LocationProvider is deterministic. for non-deterministic LocationProvider we would need quite different approach (eg registering write locations before creating a file), as remove_orphan_files-like procedure might have problems as well

cc @phd3 @alexjo2144

@findepi I added some code for that. PTAL and tell me what you think.

@losipiuk losipiuk force-pushed the lo/test-iceberg-failure-recovery branch from b36010f to 53a739a Compare January 19, 2022 12:57
Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Cleanup extranous output files in Iceberg DML"

@@ -88,4 +91,10 @@ public DataSize getMaxScannedFileSize()
{
return maxScannedFileSize;
}

@JsonProperty
public boolean isRetriesEnabled()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the RetryMode be provided by the engine to the finish*() methods, so that we don't have to embed the info in handles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could - but I would opt for what we do now. It feels more natural - as we know information upfront. Also it allows for rejecting the request sooner if given retry mode is not supported by a connector.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allows for rejecting the request sooner if given retry mode is not supported by a connector.

i did not suggest not to provide it to begin methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be asymetric vs other "stuff" we pass to "begin*" methods (layout, list of columns). I will leave it as is.


private boolean isFileCreatedByQuery(String fileName, String queryId)
{
return fileName.startsWith(queryId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment that one query id be cannot be a prefix of another query id

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point actually.

Changed to

        verify(!queryId.contains("-"), "queryId(%s) should not contain hyphens", queryId);
        return fileName.startsWith(queryId + "-");

@losipiuk losipiuk force-pushed the lo/test-iceberg-failure-recovery branch from 53a739a to 018609c Compare February 10, 2022 18:16
With query/task retries there is a chance that extra files, which
does not make it to the snapshot are written to tables directory.
While most of such cases should be cleaned up by writers on workers,
there is a slim channce that some of those will survive query exection
(e.g. if worker machine is killed).

This commit adds pre-commit routine on coordinator which deletes what
remained. This is still opportunistic and not 100% sure to delete
everything as extra files may still be written after cleanup routine
already completed, but we are trying our best. The remaining files does
not imply query correctness.
@losipiuk losipiuk force-pushed the lo/test-iceberg-failure-recovery branch from 018609c to 955d243 Compare February 11, 2022 09:29
@losipiuk
Copy link
Member Author

CI flake: #10631

@losipiuk losipiuk merged commit 6047480 into trinodb:master Feb 14, 2022
@github-actions github-actions bot added this to the 371 milestone Feb 14, 2022
@losipiuk losipiuk mentioned this pull request Feb 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Add tests for Iceberg connector correctness with full query retries
4 participants