-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16709: abortAndPauseCleaning only when future log is not existed #15951
Changes from 1 commit
57fd500
e644536
4a1b76d
3e6a880
d913971
9a6da74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2116,14 +2116,13 @@ class ReplicaManager(val config: KafkaConfig, | |
|
||
// Add future replica log to partition's map if it's not existed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if it's not existed => if it doesn't exist |
||
if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sincre this section is inside a block of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, normally, when we created future log is in this path:
So in the end, we'll have future log added in both But it's possible that the future log only exists in That means, we have to do the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Given that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a way to determine if That is, Sorry, maybe I didn't get your question here. Could you explain again if I misunderstand it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The scenario I was thinking of is when the broker starts up, But you're right, my confusion here was with where maybeCreateFutureReplica checks if the futureLog already exists, it checks in itself (Partition) not in LogManager, so this makes sense. |
||
val futureLogInPartition = futureLocalLogOrException(topicPartition) | ||
// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move | ||
// replica from source dir to destination dir | ||
logManager.abortAndPauseCleaning(topicPartition) | ||
|
||
futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, | ||
partition.getLeaderEpoch, futureLogInPartition.highWatermark)) | ||
} | ||
|
||
futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader, | ||
partition.getLeaderEpoch, futureLog.highWatermark)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks to the failing test, I found I was wrong. We should always add the partition into fetch thread no matter we created the future log or not since before |
||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
maybeCreateFutureReplica
return false, we assume another thread already add the future log to partition and invoke alter thread. Hence, we don't need to abort cleaning since another thread does it.However, adding alter thread (
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
) is not in this check. Is it possible that alter thread, which is invoked by another thread, just remove the future log and then this thread add the topic partition toreplicaAlterLogDirsManager
? It seems to me that alter thread will get fail as future log of partition is gone.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's possible. But I think that's fine because the removal of future log could because:
ReplicaManager#makeLeader or makeFollower
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, my previous comments is incorrect. Both
alterReplicaLogDirs
andmaybeAddLogDirFetchers
are inreplicaStateChangeLock
, so the race condition I described should not happen.However, I'm thinking whether it is fine to add alter thread by
maybeAddLogDirFetchers
even though the future log of partition is already created by another thread. Although no new alter thread will be created asBrokerIdAndFetcherId
is identical.In short,
alterReplicaLogDirs
adds alter thread [0] only if it succeeds to create future log of partition. MaybemaybeAddLogDirFetchers
should follow same rule? Or we can add comments to say "that is fine asreplicaAlterLogDirsManager.addFetcherForPartitions
will be a no-op in this case?[0]
kafka/core/src/main/scala/kafka/server/ReplicaManager.scala
Line 1198 in 5552f5c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 , thanks for the comment.
For this:
I chose latter option because if we only create fetcher when future log is inexisted, it might cause potential side effect that this fetcher is removed when leadership change, but not get added later. I've added the comment in this commit: 0d78e49 . Thanks.