Skip to content

Commit

Permalink
fix(metarepos): new topic should start from MinGLSN
Browse files Browse the repository at this point in the history
  • Loading branch information
hungryjang committed Apr 27, 2023
1 parent 9f64785 commit d1ae8c8
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
42 changes: 42 additions & 0 deletions internal/metarepos/raft_metadata_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,14 @@ func TestMRGlobalCommit(t *testing.T) {
return hwm == types.GLSN(9)
}), ShouldBeTrue)
})

Convey("The LastHighWatermark of the new topic should be InvalidGLSN", func(ctx C) {
hwm, _ := mr.GetLastCommitResults().LastHighWatermark(topicID, -1)
So(hwm, ShouldNotEqual, types.InvalidGLSN)

hwm, _ = mr.GetLastCommitResults().LastHighWatermark(topicID+1, -1)
So(hwm, ShouldEqual, types.InvalidGLSN)
})
})
})
}
Expand Down Expand Up @@ -2750,6 +2758,39 @@ func TestMRTopicLastHighWatermark(t *testing.T) {
So(mr.numCommitSince(topicID, lsID, base, latest, -1), ShouldEqual, 2)
}
}

Convey("when topic added, glsn of new topic should start from 1", func(ctx C) {
err := mr.storage.registerTopic(&varlogpb.TopicDescriptor{TopicID: topicID})
So(err, ShouldBeNil)

lsIds := make([]types.LogStreamID, nrLS)
for i := range lsIds {
ls := makeLogStream(topicID, lsID, snIDs)
err := mr.storage.registerLogStream(ls)
So(err, ShouldBeNil)

lsIds[i] = lsID
lsID++
}
topicLogStreamID[topicID] = lsIds

preVersion := mr.storage.GetLastCommitVersion()

for _, lsID := range lsIds {
for i := 0; i < rep; i++ {
So(testutil.CompareWaitN(10, func() bool {
report := makeUncommitReport(snIDs[i], preVersion, types.InvalidGLSN, lsID, types.MinLLSN, uint64(2+i))
return mr.proposeReport(report.StorageNodeID, report.UncommitReport) == nil
}), ShouldBeTrue)
}
}

// global commit (2, 2) highest glsn: 4
So(testutil.CompareWaitN(10, func() bool {
hwm, _ := mr.GetLastCommitResults().LastHighWatermark(topicID, -1)
return hwm == types.GLSN(4)
}), ShouldBeTrue)
})
})

Convey("add logStream into topic", func(ctx C) {
Expand Down Expand Up @@ -2825,6 +2866,7 @@ func TestMRTopicLastHighWatermark(t *testing.T) {
}
}
})

})
}

Expand Down
2 changes: 1 addition & 1 deletion proto/mrpb/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (crs *LogStreamCommitResults) LastHighWatermark(topicID types.TopicID, hint
return crs.CommitResults[i].TopicID >= topicID+1
})

if i > 0 {
if i > 0 && crs.GetCommitResults()[i-1].TopicID == topicID {
return crs.GetCommitResults()[i-1].GetHighWatermark(), i - 1
}

Expand Down

0 comments on commit d1ae8c8

Please sign in to comment.