Skip to content

Commit

Permalink
KAFKA-16709: fix broken tests and update the test
Browse files Browse the repository at this point in the history
  • Loading branch information
showuon committed May 17, 2024
1 parent e644536 commit 9cdfd73
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
7 changes: 3 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2116,14 +2116,13 @@ class ReplicaManager(val config: KafkaConfig,

// Add future replica log to partition's map if it's not existed
if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) {
val futureLogInPartition = futureLocalLogOrException(topicPartition)
// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)

futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLogInPartition.highWatermark))
}

futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLog.highWatermark))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=OFF, stdout
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO


# zkclient can be verbose, during debugging it is common to adjust it separately
Expand Down
17 changes: 11 additions & 6 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ class ReplicaManagerTest {
try {
val partition = rm.createPartition(tp0)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), Option.apply(uuid))

rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
val response = rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
Expand All @@ -356,8 +356,12 @@ class ReplicaManagerTest {
.setPartitionEpoch(0)
.setReplicas(Seq[Integer](0).asJava)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Collections.singletonMap(topic, uuid),
Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
// expect the errorCounts only has 1 entry with Errors.NONE
val errorCounts = response.errorCounts()
assertEquals(1, response.errorCounts().size())
assertNotNull(errorCounts.get(Errors.NONE))
spyLogManager.maybeUpdatePreferredLogDir(tp0, dir2.getAbsolutePath)

if (futureLogCreated) {
Expand Down Expand Up @@ -505,11 +509,10 @@ class ReplicaManagerTest {
}

private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
val topicPartition = new TopicPartition(topic, 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
replicaManager.replicaFetcherManager.failedPartitions.removeAll(Set.apply(topicPartition))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)
replicaManager.createPartition(topicPartition)
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None)
Expand Down Expand Up @@ -540,7 +543,9 @@ class ReplicaManagerTest {
// make sure the future log is created
replicaManager.futureLocalLogOrException(topicPartition)
assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
(1 to loopEpochChange).foreach(epoch => replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(epoch), (_, _) => ()))
(1 to loopEpochChange).foreach(epoch => {
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(epoch), (_, _) => ())
})
// wait for the ReplicaAlterLogDirsThread to complete
TestUtils.waitUntilTrue(() => {
replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
Expand Down

0 comments on commit 9cdfd73

Please sign in to comment.