Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write stream events that timeout to write to internal buffer in separate thread #4524

Merged
merged 4 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public void checkpoint(final String resumeToken, final long recordCount) {
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

public void extendLease() {
LOG.debug("Extending lease of stream partition for collection {}", streamPartition.getCollection());
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

/**
* This method is to reset checkpoint when change stream is invalid. The current thread will give up partition and new thread
* will take ownership of partition. If change stream is valid then new thread proceeds with processing change stream else the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,41 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
long lastCheckpointTime = System.currentTimeMillis();
CheckpointStatus lastCheckpointStatus = null;
while (!Thread.currentThread().isInterrupted()) {
final CheckpointStatus checkpointStatus = checkpoints.peek();
if (checkpointStatus != null) {
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
try {
final CheckpointStatus checkpointStatus = checkpoints.peek();
if (checkpointStatus != null) {
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
}
} else {
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken());
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
// Acknowledgement not received for the checkpoint after twice ack wait time
if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
break;
}
}
} else {
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken());
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
// Acknowledgement not received for the checkpoint after twice ack wait time
if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
break;
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("No records processed. Extend the lease of the partition worker.");
partitionCheckpoint.extendLease();
lastCheckpointTime = System.currentTimeMillis();
}
}
} catch (Exception e) {
LOG.warn("Exception monitoring acknowledgments. The stream record processing will start from previous checkpoint.", e);
break;
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class StreamScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class);
private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000;
static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 120_000;
static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 60_000;
static final int DEFAULT_BUFFER_WRITE_INTERVAL_MILLS = 15_000;
private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.opensearch.dataprepper.model.source.s3.S3ScanEnvironmentVariables.STOP_S3_SCAN_PROCESSING_PROPERTY;
import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.JSON_WRITER_SETTINGS;
Expand All @@ -57,6 +60,7 @@ public class StreamWorker {
static final String BYTES_PROCESSED = "bytesProcessed";
private static final long MILLI_SECOND = 1_000_000L;
private static final String UPDATE_DESCRIPTION = "updateDescription";
private static final int BUFFER_WRITE_TIMEOUT_MILLIS = 15_000;
private final RecordBufferWriter recordBufferWriter;
private final PartitionKeyRecordConverter recordConverter;
private final DataStreamPartitionCheckpoint partitionCheckpoint;
Expand All @@ -73,11 +77,17 @@ public class StreamWorker {
private final int streamBatchSize;
private boolean stopWorker = false;
private final ExecutorService executorService;
private String lastLocalCheckpoint;
private Long lastLocalRecordCount = null;
private String lastLocalCheckpoint = null;
private long lastLocalRecordCount = 0;
Optional<S3PartitionStatus> s3PartitionStatus = Optional.empty();
private Integer currentEpochSecond;
private int recordsSeenThisSecond = 0;
final List<Event> records = new ArrayList<>();
final List<Long> recordBytes = new ArrayList<>();
long lastBufferWriteTime = System.currentTimeMillis();
private String checkPointToken = null;
private long recordCount = 0;
private final Lock lock;

public static StreamWorker create(final RecordBufferWriter recordBufferWriter,
final PartitionKeyRecordConverter recordConverter,
Expand Down Expand Up @@ -119,13 +129,14 @@ public StreamWorker(final RecordBufferWriter recordBufferWriter,
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-checkpoint"));
this.lock = new ReentrantLock();
if (sourceConfig.isAcknowledgmentsEnabled()) {
// starts acknowledgement monitoring thread
streamAcknowledgementManager.init((Void) -> stop());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this call stop() when exceptions are hit? Do we want to catch exceptions to keep the streamAcknowledgmentManager thread from crashing? Or is just stopping fine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a separate PR for handling streamAcknowledgmentManager. We will need to handle some edge cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the change.

} else {
// checkpoint in separate thread
this.executorService.submit(this::checkpointStream);
}
// buffer write and checkpoint in separate thread on timeout
this.executorService.submit(this::bufferWriteAndCheckpointStream);

}

private MongoCursor<ChangeStreamDocument<Document>> getChangeStreamCursor(final MongoCollection<Document> collection,
Expand Down Expand Up @@ -156,15 +167,16 @@ private boolean shouldWaitForS3Partition(final String collection) {

public void processStream(final StreamPartition streamPartition) {
Optional<String> resumeToken = streamPartition.getProgressState().map(StreamProgressState::getResumeToken);
resumeToken.ifPresent(token -> checkPointToken = token);
Optional<Long> loadedRecords = streamPartition.getProgressState().map(StreamProgressState::getLoadedRecords);
loadedRecords.ifPresent(count -> recordCount = count);

final String collectionDbName = streamPartition.getCollection();
List<String> collectionDBNameList = List.of(collectionDbName.split(COLLECTION_SPLITTER));
if (collectionDBNameList.size() < 2) {
throw new IllegalArgumentException("Invalid Collection Name. Must be in db.collection format");
}
long recordCount = 0;
final List<Event> records = new ArrayList<>();
final List<Long> recordBytes = new ArrayList<>();
String checkPointToken = null;

try (MongoClient mongoClient = MongoDBConnection.getMongoClient(sourceConfig)) {
// Access the database
MongoDatabase database = mongoClient.getDatabase(collectionDBNameList.get(0));
Expand All @@ -189,7 +201,6 @@ public void processStream(final StreamPartition streamPartition) {
throw new IllegalStateException("S3 partitions are not created. Please check the S3 partition creator thread.");
}
recordConverter.initializePartitions(s3Partitions);
long lastBufferWriteTime = System.currentTimeMillis();
while (!Thread.currentThread().isInterrupted() && !stopWorker) {
if (cursor.hasNext()) {
try {
Expand All @@ -208,7 +219,6 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS);
final long bytes = record.getBytes().length;
bytesReceivedSummary.record(bytes);

checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS);
final Optional<BsonDocument> primaryKeyDoc = Optional.ofNullable(document.getDocumentKey());
final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.get(DOCUMENTDB_ID_FIELD_NAME).getBsonType().name()).orElse(UNKNOWN_TYPE);
final Event event = recordConverter.convert(record, eventCreateTimeEpochMillis, eventCreationTimeEpochNanos,
Expand All @@ -220,18 +230,19 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS);
event.delete(DOCUMENTDB_ID_FIELD_NAME);
records.add(event);
recordBytes.add(bytes);
recordCount += 1;

if ((recordCount % recordFlushBatchSize == 0) || (System.currentTimeMillis() - lastBufferWriteTime >= bufferWriteIntervalInMs)) {
LOG.debug("Write to buffer for line {} to {}", (recordCount - recordFlushBatchSize), recordCount);
writeToBuffer(records, checkPointToken, recordCount);
lastLocalCheckpoint = checkPointToken;
lastLocalRecordCount = recordCount;
lastBufferWriteTime = System.currentTimeMillis();
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Long::longValue).sum());
records.clear();
recordBytes.clear();

lock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing some context. Isn't this worker instance supposed to be owned by a single thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have main thread and new thread now. Main loop buffers record and writes when batch size or flush time is reached. The thread writes if main loop has not written for the time duration. The main thread will be blocked if there are no records/updates in stream and it has not flushed the data.

try {
recordCount += 1;
checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS);

if ((recordCount % recordFlushBatchSize == 0) || (System.currentTimeMillis() - lastBufferWriteTime >= bufferWriteIntervalInMs)) {
writeToBuffer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are writing to buffer in the main loop and in the thread? What does that accomplish exactly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main loop buffers record and writes when batch size or flush time is reached. The thread writes if main loop has not written for the time duration. The main thread will be blocked if there are no records/updates in stream and it has not flushed the data.

}
} finally {
lock.unlock();
}

} else if(shouldTerminateChangeStream(operationType)){
stop();
partitionCheckpoint.resetCheckpoint();
Expand All @@ -240,7 +251,6 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS);
LOG.warn("The change stream operation type {} is not handled", operationType);
}
} catch(Exception e){
// TODO handle documents with size > 10 MB.
// this will only happen if writing to buffer gets interrupted from shutdown,
// otherwise it's infinite backoff and retry
LOG.error("Failed to add records to buffer with error", e);
Expand Down Expand Up @@ -268,6 +278,11 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS);

System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY);

// stop other threads for this worker
stop();

partitionCheckpoint.giveUpPartition();

// shutdown acknowledgement monitoring thread
if (streamAcknowledgementManager != null) {
streamAcknowledgementManager.shutdown();
Expand Down Expand Up @@ -304,17 +319,53 @@ private void writeToBuffer(final List<Event> records, final String checkPointTok
successItemsCounter.increment(records.size());
}

private void checkpointStream() {
private void writeToBuffer() {
LOG.debug("Write to buffer for line {} to {}", (recordCount - recordFlushBatchSize), recordCount);
writeToBuffer(records, checkPointToken, recordCount);
lastLocalCheckpoint = checkPointToken;
lastLocalRecordCount = recordCount;
lastBufferWriteTime = System.currentTimeMillis();
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Long::longValue).sum());
records.clear();
recordBytes.clear();
}

private void bufferWriteAndCheckpointStream() {
long lastCheckpointTime = System.currentTimeMillis();
while (!Thread.currentThread().isInterrupted() && !stopWorker) {
if (lastLocalRecordCount != null && (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs)) {
LOG.debug("Perform regular checkpoint for resume token {} at record count {}", lastLocalCheckpoint, lastLocalRecordCount);
partitionCheckpoint.checkpoint(lastLocalCheckpoint, lastLocalRecordCount);
lastCheckpointTime = System.currentTimeMillis();
if (!records.isEmpty() && lastBufferWriteTime < Instant.now().minusMillis(BUFFER_WRITE_TIMEOUT_MILLIS).toEpochMilli()) {
lock.lock();
LOG.debug("Writing to buffer due to buffer write delay");
try {
writeToBuffer();
} catch(Exception e){
// this will only happen if writing to buffer gets interrupted from shutdown,
// otherwise it's infinite backoff and retry
LOG.error("Failed to add records to buffer with error", e);
failureItemsCounter.increment(records.size());
} finally {
lock.unlock();
}
}

if (!sourceConfig.isAcknowledgmentsEnabled()) {
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
try {
lock.lock();
LOG.debug("Perform regular checkpoint for resume token {} at record count {}", lastLocalCheckpoint, lastLocalRecordCount);
partitionCheckpoint.checkpoint(lastLocalCheckpoint, lastLocalRecordCount);
} catch (Exception e) {
LOG.warn("Exception checkpointing the current state. The stream record processing will start from previous checkpoint.", e);
stop();
} finally {
lock.unlock();;
}
lastCheckpointTime = System.currentTimeMillis();
}
}

try {
Thread.sleep(checkPointIntervalInMs);
Thread.sleep(BUFFER_WRITE_TIMEOUT_MILLIS);
} catch (InterruptedException ex) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ public void resetCheckpoint_success() {
verify(streamProgressState).setLoadedRecords(0);
verify(streamProgressState).setLastUpdateTimestamp(anyLong());
}

@Test
public void extendLease_success() {
dataStreamPartitionCheckpoint.extendLease();
verify(enhancedSourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}
}
Loading
Loading