Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write stream events that timeout to write to internal buffer in separate thread #4524

Merged
merged 4 commits into from
May 10, 2024

Conversation

dinujoh
Copy link
Member

@dinujoh dinujoh commented May 9, 2024

Write stream events that timeout to write to internal buffer in separate thread

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…ate thread

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
graytaylor0
graytaylor0 previously approved these changes May 9, 2024
LOG.debug("Perform regular checkpoint for resume token {} at record count {}", lastLocalCheckpoint, lastLocalRecordCount);
partitionCheckpoint.checkpoint(lastLocalCheckpoint, lastLocalRecordCount);
} catch (Exception e) {
LOG.error("Exception checkpointing the current state. New thread should start the stream from previous checkpoint.", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This could just be a warn level log

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS);

if ((recordCount % recordFlushBatchSize == 0) || (System.currentTimeMillis() - lastBufferWriteTime >= bufferWriteIntervalInMs)) {
writeToBuffer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are writing to buffer in the main loop and in the thread? What does that accomplish exactly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main loop buffers record and writes when batch size or flush time is reached. The thread writes if main loop has not written for the time duration. The main thread will be blocked if there are no records/updates in stream and it has not flushed the data.

@@ -119,13 +128,14 @@ public StreamWorker(final RecordBufferWriter recordBufferWriter,
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-checkpoint"));
this.lock = new ReentrantLock();
if (sourceConfig.isAcknowledgmentsEnabled()) {
// starts acknowledgement monitoring thread
streamAcknowledgementManager.init((Void) -> stop());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this call stop() when exceptions are hit? Do we want to catch exceptions to keep the streamAcknowledgmentManager thread from crashing? Or is just stopping fine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a separate PR for handling streamAcknowledgmentManager. We will need to handle some edge cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the change.

@@ -310,9 +310,9 @@ void test_processStream_checkPointIntervalSuccess() {
verify(cursor).close();
verify(cursor, times(4)).hasNext();
verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection);
verify(mockPartitionCheckpoint, atLeast(1)).checkpoint(resumeToken3, 3);
//verify(mockPartitionCheckpoint, atLeast(1)).checkpoint(resumeToken3, 3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Some extra comments in the tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will take a look

records.clear();
recordBytes.clear();

lock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing some context. Isn't this worker instance supposed to be owned by a single thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have main thread and new thread now. Main loop buffers record and writes when batch size or flush time is reached. The thread writes if main loop has not written for the time duration. The main thread will be blocked if there are no records/updates in stream and it has not flushed the data.

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
chenqi0805
chenqi0805 previously approved these changes May 9, 2024
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
@dinujoh dinujoh merged commit f3ac6b7 into opensearch-project:main May 10, 2024
46 of 47 checks passed
@kkondaka kkondaka added this to the v2.8 milestone May 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants