Skip to content

Commit 0971e09

Browse files
Correct TestJetStreamClusterConsumerDeliveredSyncReporting
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent d3e9e79 commit 0971e09

File tree

1 file changed

+27
-3
lines changed

1 file changed

+27
-3
lines changed

server/jetstream_cluster_1_test.go

+27-3
Original file line numberDiff line numberDiff line change
@@ -6093,24 +6093,48 @@ func TestJetStreamClusterConsumerDeliveredSyncReporting(t *testing.T) {
60936093
require_NoError(t, err)
60946094
}
60956095

6096+
opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
6097+
for _, s := range c.servers {
6098+
jsz, err := s.Jsz(opts)
6099+
require_NoError(t, err)
6100+
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
6101+
require_Equal(t, ci.Delivered.Consumer, 0)
6102+
require_Equal(t, ci.Delivered.Stream, 0)
6103+
}
6104+
60966105
msgs, err := sub.Fetch(1)
60976106
require_NoError(t, err)
60986107
require_Equal(t, len(msgs), 1)
6108+
meta, err := msgs[0].Metadata()
6109+
require_NoError(t, err)
6110+
require_Equal(t, meta.Sequence.Consumer, 1)
6111+
require_Equal(t, meta.Sequence.Stream, 1)
6112+
6113+
// Allow some time for the state to propagate.
6114+
maxWait := 200 * time.Millisecond
6115+
time.Sleep(maxWait)
6116+
6117+
for _, s := range c.servers {
6118+
jsz, err := s.Jsz(opts)
6119+
require_NoError(t, err)
6120+
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
6121+
require_Equal(t, ci.Delivered.Consumer, 1)
6122+
require_Equal(t, ci.Delivered.Stream, 1)
6123+
}
60996124

61006125
// Now we want to make sure that jsz reporting will show the same
61016126
// state, including delivered, which will have skipped to the end.
61026127
// The skip can happen on several factors, but for here we just send
61036128
// another pull request which we will let fail.
6104-
_, err = sub.Fetch(1, nats.MaxWait(200*time.Millisecond))
6129+
_, err = sub.Fetch(1, nats.MaxWait(maxWait))
61056130
require_Error(t, err)
61066131

6107-
opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
61086132
for _, s := range c.servers {
61096133
jsz, err := s.Jsz(opts)
61106134
require_NoError(t, err)
61116135
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
61126136
require_Equal(t, ci.Delivered.Consumer, 1)
6113-
require_Equal(t, ci.Delivered.Stream, 1)
6137+
require_Equal(t, ci.Delivered.Stream, 11)
61146138
}
61156139
}
61166140

0 commit comments

Comments
 (0)