-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write stream events that timeout to write to internal buffer in separate thread #4524
Changes from all commits
28d500a
50b940b
f261e3c
5c95c71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,13 +29,16 @@ | |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
import static org.opensearch.dataprepper.model.source.s3.S3ScanEnvironmentVariables.STOP_S3_SCAN_PROCESSING_PROPERTY; | ||
import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.JSON_WRITER_SETTINGS; | ||
|
@@ -57,6 +60,7 @@ public class StreamWorker { | |
static final String BYTES_PROCESSED = "bytesProcessed"; | ||
private static final long MILLI_SECOND = 1_000_000L; | ||
private static final String UPDATE_DESCRIPTION = "updateDescription"; | ||
private static final int BUFFER_WRITE_TIMEOUT_MILLIS = 15_000; | ||
private final RecordBufferWriter recordBufferWriter; | ||
private final PartitionKeyRecordConverter recordConverter; | ||
private final DataStreamPartitionCheckpoint partitionCheckpoint; | ||
|
@@ -73,11 +77,17 @@ public class StreamWorker { | |
private final int streamBatchSize; | ||
private boolean stopWorker = false; | ||
private final ExecutorService executorService; | ||
private String lastLocalCheckpoint; | ||
private Long lastLocalRecordCount = null; | ||
private String lastLocalCheckpoint = null; | ||
private long lastLocalRecordCount = 0; | ||
Optional<S3PartitionStatus> s3PartitionStatus = Optional.empty(); | ||
private Integer currentEpochSecond; | ||
private int recordsSeenThisSecond = 0; | ||
final List<Event> records = new ArrayList<>(); | ||
final List<Long> recordBytes = new ArrayList<>(); | ||
long lastBufferWriteTime = System.currentTimeMillis(); | ||
private String checkPointToken = null; | ||
private long recordCount = 0; | ||
private final Lock lock; | ||
|
||
public static StreamWorker create(final RecordBufferWriter recordBufferWriter, | ||
final PartitionKeyRecordConverter recordConverter, | ||
|
@@ -119,13 +129,14 @@ public StreamWorker(final RecordBufferWriter recordBufferWriter, | |
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); | ||
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); | ||
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-checkpoint")); | ||
this.lock = new ReentrantLock(); | ||
if (sourceConfig.isAcknowledgmentsEnabled()) { | ||
// starts acknowledgement monitoring thread | ||
streamAcknowledgementManager.init((Void) -> stop()); | ||
} else { | ||
// checkpoint in separate thread | ||
this.executorService.submit(this::checkpointStream); | ||
} | ||
// buffer write and checkpoint in separate thread on timeout | ||
this.executorService.submit(this::bufferWriteAndCheckpointStream); | ||
|
||
} | ||
|
||
private MongoCursor<ChangeStreamDocument<Document>> getChangeStreamCursor(final MongoCollection<Document> collection, | ||
|
@@ -156,15 +167,16 @@ private boolean shouldWaitForS3Partition(final String collection) { | |
|
||
public void processStream(final StreamPartition streamPartition) { | ||
Optional<String> resumeToken = streamPartition.getProgressState().map(StreamProgressState::getResumeToken); | ||
resumeToken.ifPresent(token -> checkPointToken = token); | ||
Optional<Long> loadedRecords = streamPartition.getProgressState().map(StreamProgressState::getLoadedRecords); | ||
loadedRecords.ifPresent(count -> recordCount = count); | ||
|
||
final String collectionDbName = streamPartition.getCollection(); | ||
List<String> collectionDBNameList = List.of(collectionDbName.split(COLLECTION_SPLITTER)); | ||
if (collectionDBNameList.size() < 2) { | ||
throw new IllegalArgumentException("Invalid Collection Name. Must be in db.collection format"); | ||
} | ||
long recordCount = 0; | ||
final List<Event> records = new ArrayList<>(); | ||
final List<Long> recordBytes = new ArrayList<>(); | ||
String checkPointToken = null; | ||
|
||
try (MongoClient mongoClient = MongoDBConnection.getMongoClient(sourceConfig)) { | ||
// Access the database | ||
MongoDatabase database = mongoClient.getDatabase(collectionDBNameList.get(0)); | ||
|
@@ -189,7 +201,6 @@ public void processStream(final StreamPartition streamPartition) { | |
throw new IllegalStateException("S3 partitions are not created. Please check the S3 partition creator thread."); | ||
} | ||
recordConverter.initializePartitions(s3Partitions); | ||
long lastBufferWriteTime = System.currentTimeMillis(); | ||
while (!Thread.currentThread().isInterrupted() && !stopWorker) { | ||
if (cursor.hasNext()) { | ||
try { | ||
|
@@ -208,7 +219,6 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS); | |
final long bytes = record.getBytes().length; | ||
bytesReceivedSummary.record(bytes); | ||
|
||
checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS); | ||
final Optional<BsonDocument> primaryKeyDoc = Optional.ofNullable(document.getDocumentKey()); | ||
final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.get(DOCUMENTDB_ID_FIELD_NAME).getBsonType().name()).orElse(UNKNOWN_TYPE); | ||
final Event event = recordConverter.convert(record, eventCreateTimeEpochMillis, eventCreationTimeEpochNanos, | ||
|
@@ -220,18 +230,19 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS); | |
event.delete(DOCUMENTDB_ID_FIELD_NAME); | ||
records.add(event); | ||
recordBytes.add(bytes); | ||
recordCount += 1; | ||
|
||
if ((recordCount % recordFlushBatchSize == 0) || (System.currentTimeMillis() - lastBufferWriteTime >= bufferWriteIntervalInMs)) { | ||
LOG.debug("Write to buffer for line {} to {}", (recordCount - recordFlushBatchSize), recordCount); | ||
writeToBuffer(records, checkPointToken, recordCount); | ||
lastLocalCheckpoint = checkPointToken; | ||
lastLocalRecordCount = recordCount; | ||
lastBufferWriteTime = System.currentTimeMillis(); | ||
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Long::longValue).sum()); | ||
records.clear(); | ||
recordBytes.clear(); | ||
|
||
lock.lock(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing some context. Isn't this worker instance supposed to be owned by a single thread? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have main thread and new thread now. Main loop buffers record and writes when batch size or flush time is reached. The thread writes if main loop has not written for the time duration. The main thread will be blocked if there are no records/updates in stream and it has not flushed the data. |
||
try { | ||
recordCount += 1; | ||
checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS); | ||
|
||
if ((recordCount % recordFlushBatchSize == 0) || (System.currentTimeMillis() - lastBufferWriteTime >= bufferWriteIntervalInMs)) { | ||
writeToBuffer(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we are writing to buffer in the main loop and in the thread? What does that accomplish exactly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Main loop buffers record and writes when batch size or flush time is reached. The thread writes if main loop has not written for the time duration. The main thread will be blocked if there are no records/updates in stream and it has not flushed the data. |
||
} | ||
} finally { | ||
lock.unlock(); | ||
} | ||
|
||
} else if(shouldTerminateChangeStream(operationType)){ | ||
stop(); | ||
partitionCheckpoint.resetCheckpoint(); | ||
|
@@ -240,7 +251,6 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS); | |
LOG.warn("The change stream operation type {} is not handled", operationType); | ||
} | ||
} catch(Exception e){ | ||
// TODO handle documents with size > 10 MB. | ||
// this will only happen if writing to buffer gets interrupted from shutdown, | ||
// otherwise it's infinite backoff and retry | ||
LOG.error("Failed to add records to buffer with error", e); | ||
|
@@ -268,6 +278,11 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS); | |
|
||
System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY); | ||
|
||
// stop other threads for this worker | ||
stop(); | ||
|
||
partitionCheckpoint.giveUpPartition(); | ||
|
||
// shutdown acknowledgement monitoring thread | ||
if (streamAcknowledgementManager != null) { | ||
streamAcknowledgementManager.shutdown(); | ||
|
@@ -304,17 +319,53 @@ private void writeToBuffer(final List<Event> records, final String checkPointTok | |
successItemsCounter.increment(records.size()); | ||
} | ||
|
||
private void checkpointStream() { | ||
private void writeToBuffer() { | ||
LOG.debug("Write to buffer for line {} to {}", (recordCount - recordFlushBatchSize), recordCount); | ||
writeToBuffer(records, checkPointToken, recordCount); | ||
lastLocalCheckpoint = checkPointToken; | ||
lastLocalRecordCount = recordCount; | ||
lastBufferWriteTime = System.currentTimeMillis(); | ||
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Long::longValue).sum()); | ||
records.clear(); | ||
recordBytes.clear(); | ||
} | ||
|
||
private void bufferWriteAndCheckpointStream() { | ||
long lastCheckpointTime = System.currentTimeMillis(); | ||
while (!Thread.currentThread().isInterrupted() && !stopWorker) { | ||
if (lastLocalRecordCount != null && (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs)) { | ||
LOG.debug("Perform regular checkpoint for resume token {} at record count {}", lastLocalCheckpoint, lastLocalRecordCount); | ||
partitionCheckpoint.checkpoint(lastLocalCheckpoint, lastLocalRecordCount); | ||
lastCheckpointTime = System.currentTimeMillis(); | ||
if (!records.isEmpty() && lastBufferWriteTime < Instant.now().minusMillis(BUFFER_WRITE_TIMEOUT_MILLIS).toEpochMilli()) { | ||
lock.lock(); | ||
LOG.debug("Writing to buffer due to buffer write delay"); | ||
try { | ||
writeToBuffer(); | ||
} catch(Exception e){ | ||
// this will only happen if writing to buffer gets interrupted from shutdown, | ||
// otherwise it's infinite backoff and retry | ||
LOG.error("Failed to add records to buffer with error", e); | ||
failureItemsCounter.increment(records.size()); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
if (!sourceConfig.isAcknowledgmentsEnabled()) { | ||
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { | ||
try { | ||
lock.lock(); | ||
LOG.debug("Perform regular checkpoint for resume token {} at record count {}", lastLocalCheckpoint, lastLocalRecordCount); | ||
partitionCheckpoint.checkpoint(lastLocalCheckpoint, lastLocalRecordCount); | ||
} catch (Exception e) { | ||
LOG.warn("Exception checkpointing the current state. The stream record processing will start from previous checkpoint.", e); | ||
stop(); | ||
} finally { | ||
lock.unlock();; | ||
} | ||
lastCheckpointTime = System.currentTimeMillis(); | ||
} | ||
} | ||
|
||
try { | ||
Thread.sleep(checkPointIntervalInMs); | ||
Thread.sleep(BUFFER_WRITE_TIMEOUT_MILLIS); | ||
} catch (InterruptedException ex) { | ||
break; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this call stop() when exceptions are hit? Do we want to catch exceptions to keep the streamAcknowledgmentManager thread from crashing? Or is just stopping fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will have a separate PR for handling streamAcknowledgmentManager. We will need to handle some edge cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the change.