Skip to content

Commit

Permalink
Fix TestJetStreamClusterConsumeWithStartSequence to account for cli…
Browse files Browse the repository at this point in the history
…pping of `OptStartSeq`

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Oct 17, 2024
1 parent 317f223 commit 14c46af
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3013,6 +3013,8 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
// This is the success condition for all sub-tests below
var ExpectedMsgId = ""
checkMessage := func(t *testing.T, msg *nats.Msg) {
t.Helper()

msgMeta, err := msg.Metadata()
require_NoError(t, err)

Expand All @@ -3025,6 +3027,8 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
}

checkRawMessage := func(t *testing.T, msg *nats.RawStreamMsg) {
t.Helper()

// Check sequence number
require_Equal(t, msg.Sequence, ChosenSeq)

Expand Down Expand Up @@ -3058,7 +3062,23 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
})
require_NoError(t, err)

// Setup: create subscriptions before stream is populated
// Setup: populate stream
buf := make([]byte, 100)
for i := uint64(1); i <= NumMessages; i++ {
msgId := nuid.Next()
pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId))
require_NoError(t, err)

// Verify assumption made in tests below
require_Equal(t, pubAck.Sequence, i)

if i == ChosenSeq {
// Save the expected message id for the chosen message
ExpectedMsgId = msgId
}
}

// Setup: create subscriptions, needs to be after stream creation or OptStartSeq could be clipped
var preCreatedSub, preCreatedSubDurable *nats.Subscription
{
preCreatedSub, err = js.PullSubscribe(
Expand Down Expand Up @@ -3094,22 +3114,6 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
}()
}

// Setup: populate stream
buf := make([]byte, 100)
for i := uint64(1); i <= NumMessages; i++ {
msgId := nuid.Next()
pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId))
require_NoError(t, err)

// Verify assumption made in tests below
require_Equal(t, pubAck.Sequence, i)

if i == ChosenSeq {
// Save the expected message id for the chosen message
ExpectedMsgId = msgId
}
}

// Tests various ways to consume the stream starting at the ChosenSeq sequence

t.Run(
Expand Down

0 comments on commit 14c46af

Please sign in to comment.