-
Notifications
You must be signed in to change notification settings - Fork 251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-428: Mark the copy complete in the source offset for the last copied document #168
Conversation
@@ -169,6 +169,9 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() { | |||
new HashMap<String, String>() { | |||
{ | |||
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue()); | |||
String namespaceRegex = | |||
String.format("(%s\\.coll|%s\\.coll)", db1.getName(), db2.getName()); | |||
put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[driveby] without this filter this test tries to copy all data
// if isCopying is true, we want to set the COPY_KEY flag so that kafka has context that a | ||
// copy is in progress. However, for the last document that we are copying, we should not set | ||
// this flag because the copy has completed, otherwise we are relying on future change stream | ||
// events to signify that we are no longer copying. We also need to set the _id field to be a | ||
// valid resume token, which during copying exists in the cachedResumeToken variable. | ||
boolean lastDocument = !batchIterator.hasNext(); | ||
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying(); | ||
if (isCopying && lastDocument && noMoreDataToCopy) { | ||
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson()); | ||
sourceOffset.remove(COPY_KEY); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Highlighting this since this is the logic that fixes the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! I did a first pass
() -> | ||
assertTrue( | ||
firstPoll.stream() | ||
.map(SourceRecord::sourceOffset) | ||
.allMatch(i -> i.containsKey("copy")))); | ||
.limit(149) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[opt] Thoughts on replacing this magic number to make this test a little more readable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm what do you think about changing this to 150 - 1
with a comment like // exclude the last record
I extracted these numbers into variables but I feel like it hurt the readability, especially for the other test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would that be the case even if all these numbers are generated from two variables, such as numInsertsPerNS=75
and numNamespaces=2
? Either way, I'm fine with keeping it as is.
assertTrue( | ||
thirdPoll.stream() | ||
.map(SourceRecord::sourceOffset) | ||
.limit(24) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same optional comment here
sourceOffset.put(COPY_KEY, "true"); | ||
} | ||
Iterator<BsonDocument> batchIterator = getNextBatch().iterator(); | ||
BsonDocument changeStreamDocument; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Do we need to define this outside the while loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err nope 😆 I'll update
src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java
Show resolved
Hide resolved
boolean lastDocument = !batchIterator.hasNext(); | ||
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying(); | ||
if (isCopying && lastDocument && noMoreDataToCopy) { | ||
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great find! Should we add a test that confirms we're setting the expected resume token?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we got coverage of this with the recent e2e tests I added around restarting after copying. Let me know if you disagree.
boolean lastDocument = !batchIterator.hasNext(); | ||
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying(); | ||
if (isCopying && lastDocument && noMoreDataToCopy) { | ||
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of cachedResumeToken
, it might be safer to create and use a different variable here that contains the same value as cachedResumeToken
but isn't modified anywhere else. cachedResumeToken
get's set to null
(here) within getResumeToken
, which ends up being called here if cursor == null
. The cursor
could be set to null
if the cluster runs into a temporary connection issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good catch, I'll create a new token called resumeTokenAfterCopy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait I think we're actually good here, I'm happy to create a new variable for extra safety though if you're still concerned.
because we are assigning this in the copied batch the cursor == null
code won't be hit until after we use and store the cachedResumeToken
. i.e. the getNextBatch function exits early here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'm fine with keeping it as is as well.
@@ -544,7 +559,7 @@ void testCopyingExistingWithARestartMidwayThrough() { | |||
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName()); | |||
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue()); | |||
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25"); | |||
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "10000"); | |||
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[driveby] because this test expects an empty batch we should reduce the poll await time even further, otherwise we're guaranteeing to wait for at least 5x10000ms. This just speeds the test up.
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object | ||
@SuppressWarnings("unchecked") | ||
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of this code is just to make the (Map<String, Object>) lastOffset
cast safer.. since this is just a test we could probably skip the null check.
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object | ||
@SuppressWarnings("unchecked") | ||
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset; | ||
when(offsetStorageReader.offset(any())).thenReturn(mockedOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[driveby] I included mocking logic in this test because otherwise the code will always re-copy the data, but not because it sees the expected copy offset, but because the context is null.
|
||
// mock the context so that on restart we know where the last task left off | ||
when(context.offsetStorageReader()).thenReturn(offsetStorageReader); | ||
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as the one above for mocking offset, we could remove this line if we want.
So the integration tests are failing for version 3.6 which I'm assuming has something to do with changestream/resumetoken support. I'm wondering if I should just remove this test (and potentially replace with newer versions of mongodb) cc: @arahmanan EDIT: I added some logging in this patch and found that for version 3.6 the cachedResumeToken is initially stored as null. It might be worth including a null check in the if condition for this logic to avoid these scenarios. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final comment before approving!
boolean lastDocument = !batchIterator.hasNext(); | ||
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying(); | ||
if (isCopying && lastDocument && noMoreDataToCopy) { | ||
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'm fine with keeping it as is as well.
() -> | ||
assertTrue( | ||
firstPoll.stream() | ||
.map(SourceRecord::sourceOffset) | ||
.allMatch(i -> i.containsKey("copy")))); | ||
.limit(149) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would that be the case even if all these numbers are generated from two variables, such as numInsertsPerNS=75
and numNamespaces=2
? Either way, I'm fine with keeping it as is.
@@ -579,6 +617,71 @@ void testCopyingExistingWithARestartMidwayThrough() { | |||
} | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the integration tests are failing for version 3.6 which I'm assuming has something to do with changestream/resumetoken support. I'm wondering if I should just remove this test (and potentially replace with newer versions of mongodb)
EDIT: I added some logging in this patch and found that for version 3.6 the cachedResumeToken is initially stored as null. It might be worth including a null check in the if condition for this logic to avoid these scenarios.
Moving this thread here. I agree with adding a null check. I don't expect customers to be using v3.6 in production, but it's still worth avoiding the null pointer exception. Out of curiosity, did you find any valuable details from the logs you've added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the logs showed that cachedResumeToken
was being initialized as null
. (search for CALVIN:
if you wanted to review the logs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I just added the null check, I'll let the tests run but I do expect the test to still fail for version 3.6 because it doesn't have a resume token to set and resume from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep they fail in 3.6 for that reason still. https://spruce.mongodb.com/task/mongo_kafka_integration_tests__version~3.6_os~ubuntu_topology~replicaset_javaVersion~JDK17_integration_test_task_patch_d477f00c5506ac461ebc86cbeaaff291598d921c_671653eae106f80007f4ba73_24_10_21_13_15_23?execution=0&sortBy=STATUS&sortDir=ASC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
This fixes an issue that was occurring when:
COPY_EXISTING
is usedThe resulting behavior of this scenario is that the copy would occur again on restart.
This was happening because sourceOffets for copied records specify
copy: true
, so the restart thinks that the copy was in progress and it tries again. (note that restarts that occur during a copy is expected to reattempt the copy, which will duplicate data)Here is an example of what that copy offset looks like:
It does look like we have logic which appears to try to
"mark copying ended"
but this doesn't work when thecachedResult
is null.(I'm not actually sure if this logic ever worked, but I could see a future where the
cachedResult
concept is removed. I haven't thought about this enough to form a strong enough opinion yet)The fix for this issue was to identify when we are creating the
sourceOffset
for the last copied document and in that case go ahead and"mark copying ended"
by removing thecopy: true
flag and setting the '_id' field to thecachedResumeToken
.