-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add folder-based partitioning for s3 scan source #4455
Add folder-based partitioning for s3 scan source #4455
Conversation
bb70cd0
to
fd397be
Compare
Signed-off-by: Taylor Gray <tylgry@amazon.com>
fd397be
to
9b584fa
Compare
@@ -104,6 +104,16 @@ boolean isCodecProvidedWhenNeeded() { | |||
return true; | |||
} | |||
|
|||
@AssertTrue(message = "acknowledgments and delete_s3_objects_on_read must both be set to true when using PREFIX partition mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: we should probably be consistent on the naming. I see both prefix partition and folder partition being used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch this was an outdated message where I initially had a mode
of PREFIX or FULL_OBJECT_KEY
Signed-off-by: Taylor Gray <tylgry@amazon.com>
|
||
static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofMinutes(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to bump this up.
if (folderPartitionProgressState.isEmpty()) { | ||
folderPartitionProgressState = Optional.of(new S3SourceProgressState(Instant.now().toEpochMilli())); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is duplicate. Can be moved to beginning to the method.
|
||
AcknowledgementSet acknowledgementSet = null; | ||
String activeAcknowledgmentSetId = null; | ||
while (objectIndex < objectsToProcess.size() && objectsProcessed < folderPartitioningOptions.getMaxObjectsPerOwnership()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can be split into smaller methods for readability. We can move this while loop to different method.
Signed-off-by: Taylor Gray <tylgry@amazon.com>
Description
This change adds support for the s3 scan source to partition based on folders instead of objects. This means that, unlike object partitions, folder partitions will never be marked as complete, and each node / worker that grabs a folder partition will list and process objects in that folder, then give it up. The use case for this is order guarantee within S3 folders, as well as aggregations within S3 folders, as the same worker / node will process objects from the same S3 folder. To enable this mode, one must set the following configuration.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.