Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add progress check callback to update partition ownership in S3 scan source #4918

Merged

Conversation

graytaylor0
Copy link
Member

@graytaylor0 graytaylor0 commented Sep 5, 2024

Description

This change adds an asynchronous progressCheck ack callback to update the partition ownership for S3 scan source.

This fixes an issue where circuit breaker or full buffer can cause the synchronous thread to be blocked that was previously updating the partition ownership timeout.

Tested by blocking the write to the buffer and confirming that the partition for an object had it's partition ownership updated every 2 minutes

Issues Resolved

Related to #4422

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@@ -209,6 +215,17 @@ private void startProcessingObject(final long waitTimeMillis) {
}
partitionKeys.remove(objectToProcess.get().getPartitionKey());
}, ACKNOWLEDGEMENT_SET_TIMEOUT);

acknowledgementSet.addProgressCheck(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this scheduled event gets unregistered?
Also, do we register another similar scheduled event for DynamoDB pull based source?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code for AcknowledgmentSet, the acknowledgmentSet.complete() call cancels the future for this progress check

@@ -302,7 +300,8 @@ public <S extends T> void saveProgressStateForPartition(final String partitionKe

try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);
} catch (final PartitionUpdateException e) {
} catch (final Exception e) {
LOG.info("Exception while saving state for the partition {}: {}", partitionKey, e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an ERROR log and should include the stack trace since you don't know the underlying cause if you reach this code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks yeah it should be

try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);
} catch (final Exception e) {
LOG.info("Exception while updating acknowledgment wait for the partition {}: {}", partitionKey, e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an ERROR log and should include the stack trace since you don't know the underlying cause if you reach this code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update

@@ -209,6 +215,17 @@ private void startProcessingObject(final long waitTimeMillis) {
}
partitionKeys.remove(objectToProcess.get().getPartitionKey());
}, ACKNOWLEDGEMENT_SET_TIMEOUT);

acknowledgementSet.addProgressCheck(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach only works for end-to-end acknowledgements enabled. The root cause is an issue writing to the buffer, not a failure to write to the sink. So this change may actually hide the bug some.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does only resolve the issue for acknowledgments being enabled. But this aligns with how we handle visibility timeout increase on the S3-SQS source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should correct both of these really. But, this is fine for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 I agree with David. The solution should be independent of acks enabled or not. In case of Sqs, the progress check is needed only in case of acks enabled because when acks are not enabled, the sqs message is deleted immediately. It looks like this solution is a hack.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ownership tracking, renewal, cancellation etc should be part of the source coordinator itself, and not in S3 code. Isn't this issue possible in other sources that use source coordinator?

acknowledgementSet.addProgressCheck(
(ratio) -> {
try {
sourceCoordinator.saveProgressStateForPartition(objectToProcess.get().getPartitionKey(), null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are saving null to keep holding onto the partition. This has some bleeding in the internal details. I think we should have a dedicated API for it. Something like: sourceCoordinator.renewPartition(objectToProcess.get().getPartitionKey()); Internally it can save null, but this will decouple the details from the caller.

I do see one other similar usage of saving null. It would be better to clean this up before it continues to expand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair yeah probably should've been 2 separate methods from the beginning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, can you create bug issue about this so that we don't lose track?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the issue here (#4928)

…source

Signed-off-by: Taylor Gray <tylgry@amazon.com>
@@ -209,6 +215,17 @@ private void startProcessingObject(final long waitTimeMillis) {
}
partitionKeys.remove(objectToProcess.get().getPartitionKey());
}, ACKNOWLEDGEMENT_SET_TIMEOUT);

acknowledgementSet.addProgressCheck(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should correct both of these really. But, this is fine for now.

@graytaylor0 graytaylor0 merged commit 7e9866a into opensearch-project:main Sep 10, 2024
47 checks passed
@graytaylor0 graytaylor0 deleted the S3SourceProgressCheck branch September 10, 2024 16:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants