-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write stream events that timeout to write to internal buffer in separate thread #4524
Conversation
…ate thread Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
LOG.debug("Perform regular checkpoint for resume token {} at record count {}", lastLocalCheckpoint, lastLocalRecordCount); | ||
partitionCheckpoint.checkpoint(lastLocalCheckpoint, lastLocalRecordCount); | ||
} catch (Exception e) { | ||
LOG.error("Exception checkpointing the current state. New thread should start the stream from previous checkpoint.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This could just be a warn level log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS); | ||
|
||
if ((recordCount % recordFlushBatchSize == 0) || (System.currentTimeMillis() - lastBufferWriteTime >= bufferWriteIntervalInMs)) { | ||
writeToBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are writing to buffer in the main loop and in the thread? What does that accomplish exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main loop buffers record and writes when batch size or flush time is reached. The thread writes if main loop has not written for the time duration. The main thread will be blocked if there are no records/updates in stream and it has not flushed the data.
@@ -119,13 +128,14 @@ public StreamWorker(final RecordBufferWriter recordBufferWriter, | |||
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); | |||
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); | |||
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-checkpoint")); | |||
this.lock = new ReentrantLock(); | |||
if (sourceConfig.isAcknowledgmentsEnabled()) { | |||
// starts acknowledgement monitoring thread | |||
streamAcknowledgementManager.init((Void) -> stop()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this call stop() when exceptions are hit? Do we want to catch exceptions to keep the streamAcknowledgmentManager thread from crashing? Or is just stopping fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will have a separate PR for handling streamAcknowledgmentManager. We will need to handle some edge cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the change.
@@ -310,9 +310,9 @@ void test_processStream_checkPointIntervalSuccess() { | |||
verify(cursor).close(); | |||
verify(cursor, times(4)).hasNext(); | |||
verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection); | |||
verify(mockPartitionCheckpoint, atLeast(1)).checkpoint(resumeToken3, 3); | |||
//verify(mockPartitionCheckpoint, atLeast(1)).checkpoint(resumeToken3, 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Some extra comments in the tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will take a look
records.clear(); | ||
recordBytes.clear(); | ||
|
||
lock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing some context. Isn't this worker instance supposed to be owned by a single thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have main thread and new thread now. Main loop buffers record and writes when batch size or flush time is reached. The thread writes if main loop has not written for the time duration. The main thread will be blocked if there are no records/updates in stream and it has not flushed the data.
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Write stream events that timeout to write to internal buffer in separate thread
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.