-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add progress check callback to update partition ownership in S3 scan source #4918
Add progress check callback to update partition ownership in S3 scan source #4918
Conversation
82819c2
to
d4bea1a
Compare
@@ -209,6 +215,17 @@ private void startProcessingObject(final long waitTimeMillis) { | |||
} | |||
partitionKeys.remove(objectToProcess.get().getPartitionKey()); | |||
}, ACKNOWLEDGEMENT_SET_TIMEOUT); | |||
|
|||
acknowledgementSet.addProgressCheck( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will this scheduled event gets unregistered?
Also, do we register another similar scheduled event for DynamoDB pull based source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the code for AcknowledgmentSet, the acknowledgmentSet.complete() call cancels the future for this progress check
@@ -302,7 +300,8 @@ public <S extends T> void saveProgressStateForPartition(final String partitionKe | |||
|
|||
try { | |||
sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate); | |||
} catch (final PartitionUpdateException e) { | |||
} catch (final Exception e) { | |||
LOG.info("Exception while saving state for the partition {}: {}", partitionKey, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be an ERROR
log and should include the stack trace since you don't know the underlying cause if you reach this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks yeah it should be
try { | ||
sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate); | ||
} catch (final Exception e) { | ||
LOG.info("Exception while updating acknowledgment wait for the partition {}: {}", partitionKey, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be an ERROR
log and should include the stack trace since you don't know the underlying cause if you reach this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update
@@ -209,6 +215,17 @@ private void startProcessingObject(final long waitTimeMillis) { | |||
} | |||
partitionKeys.remove(objectToProcess.get().getPartitionKey()); | |||
}, ACKNOWLEDGEMENT_SET_TIMEOUT); | |||
|
|||
acknowledgementSet.addProgressCheck( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach only works for end-to-end acknowledgements enabled. The root cause is an issue writing to the buffer, not a failure to write to the sink. So this change may actually hide the bug some.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it does only resolve the issue for acknowledgments being enabled. But this aligns with how we handle visibility timeout increase on the S3-SQS source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should correct both of these really. But, this is fine for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@graytaylor0 I agree with David. The solution should be independent of acks enabled or not. In case of Sqs, the progress check is needed only in case of acks enabled because when acks are not enabled, the sqs message is deleted immediately. It looks like this solution is a hack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ownership tracking, renewal, cancellation etc should be part of the source coordinator itself, and not in S3 code. Isn't this issue possible in other sources that use source coordinator?
acknowledgementSet.addProgressCheck( | ||
(ratio) -> { | ||
try { | ||
sourceCoordinator.saveProgressStateForPartition(objectToProcess.get().getPartitionKey(), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are saving null to keep holding onto the partition. This has some bleeding in the internal details. I think we should have a dedicated API for it. Something like: sourceCoordinator.renewPartition(objectToProcess.get().getPartitionKey());
Internally it can save null
, but this will decouple the details from the caller.
I do see one other similar usage of saving null
. It would be better to clean this up before it continues to expand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair yeah probably should've been 2 separate methods from the beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, can you create bug issue about this so that we don't lose track?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the issue here (#4928)
…source Signed-off-by: Taylor Gray <tylgry@amazon.com>
d4bea1a
to
ce9bca0
Compare
@@ -209,6 +215,17 @@ private void startProcessingObject(final long waitTimeMillis) { | |||
} | |||
partitionKeys.remove(objectToProcess.get().getPartitionKey()); | |||
}, ACKNOWLEDGEMENT_SET_TIMEOUT); | |||
|
|||
acknowledgementSet.addProgressCheck( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should correct both of these really. But, this is fine for now.
Description
This change adds an asynchronous progressCheck ack callback to update the partition ownership for S3 scan source.
This fixes an issue where circuit breaker or full buffer can cause the synchronous thread to be blocked that was previously updating the partition ownership timeout.
Tested by blocking the write to the buffer and confirming that the partition for an object had it's partition ownership updated every 2 minutes
Issues Resolved
Related to #4422
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.