Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.22-RC.3 #6012

Merged
merged 8 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/cov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ jobs:

- name: Convert coverage.out to coverage.lcov
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit c680c0f7c7442485f1749eb2a13e54a686e76eb5 = tag v1.0.9
uses: jandelgado/gcov2lcov-action@c680c0f7c7442485f1749eb2a13e54a686e76eb5
# Commit 69ef3d59a24cc6e062516a73d8be123e85b15cc0 = tag v1.1.0
uses: jandelgado/gcov2lcov-action@69ef3d59a24cc6e062516a73d8be123e85b15cc0
with:
infile: acc.out
working-directory: src/github.com/nats-io/nats-server
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/nats-io/nats-server/v2
go 1.21.0

require (
github.com/klauspost/compress v1.17.10
github.com/klauspost/compress v1.17.11
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats.go v1.36.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
Expand Down
51 changes: 45 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ var (
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")
)

const (
// reasons to supply when terminating messages using limits
ackTermLimitsReason = "Message deleted by stream limits"
ackTermUnackedLimitsReason = "Unacknowledged message was deleted"
Expand Down Expand Up @@ -1290,6 +1292,9 @@ func (o *consumer) setLeader(isLeader bool) {
pullMode := o.isPullMode()
o.mu.Unlock()

// Check if there are any pending we might need to clean up etc.
o.checkPending()

// Snapshot initial info.
o.infoWithSnap(true)

Expand Down Expand Up @@ -1687,12 +1692,39 @@ func (o *consumer) config() ConsumerConfig {
return o.cfg
}

// Check if we have hit max deliveries. If so do notification and cleanup.
// Return whether or not the max was hit.
// Lock should be held.
func (o *consumer) hasMaxDeliveries(seq uint64) bool {
if o.maxdc == 0 {
return false
}
if dc := o.deliveryCount(seq); dc >= o.maxdc {
// We have hit our max deliveries for this sequence.
// Only send the advisory once.
if dc == o.maxdc {
o.notifyDeliveryExceeded(seq, dc)
}
// Determine if we signal to start flow of messages again.
if o.maxp > 0 && len(o.pending) >= o.maxp {
o.signalNewMessages()
}
// Cleanup our tracking.
delete(o.pending, seq)
if o.rdc != nil {
delete(o.rdc, seq)
}
return true
}
return false
}

// Force expiration of all pending.
// Lock should be held.
func (o *consumer) forceExpirePending() {
var expired []uint64
for seq := range o.pending {
if !o.onRedeliverQueue(seq) {
if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) {
expired = append(expired, seq)
}
}
Expand Down Expand Up @@ -3416,6 +3448,14 @@ func trackDownAccountAndInterest(acc *Account, interest string) (*Account, strin
return acc, interest
}

// Return current delivery count for a given sequence.
func (o *consumer) deliveryCount(seq uint64) uint64 {
if o.rdc == nil {
return 1
}
return o.rdc[seq]
}

// Increase the delivery count for this message.
// ONLY used on redelivery semantics.
// Lock should be held.
Expand Down Expand Up @@ -3581,9 +3621,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
var pmsg = getJSPubMsgFromPool()

// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
filters, subjf, fseq := o.filters, o.subjf, o.sseq
o.mu.Unlock()
// Check if we are multi-filtered or not.
if filters != nil {
sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg)
Expand All @@ -3598,8 +3636,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
pmsg.returnToPool()
pmsg = nil
}
// Re-acquire lock.
o.mu.Lock()
// Check if we should move our o.sseq.
if sseq >= o.sseq {
// If we are moving step by step then sseq == o.sseq.
Expand Down Expand Up @@ -4630,7 +4666,10 @@ func (o *consumer) checkPending() {
}
}
if elapsed >= deadline {
if !o.onRedeliverQueue(seq) {
// We will check if we have hit our max deliveries. Previously we would do this on getNextMsg() which
// worked well for push consumers, but with pull based consumers would require a new pull request to be
// present to process and redelivered could be reported incorrectly.
if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) {
expired = append(expired, seq)
}
} else if deadline-elapsed < next {
Expand Down
50 changes: 26 additions & 24 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2794,33 +2794,35 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if len(req.Placement.Tags) > 0 {
// Tags currently not supported.
resp.Error = NewJSClusterTagsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
cn := req.Placement.Cluster
var peers []string
ourID := cc.meta.ID()
for _, p := range cc.meta.Peers() {
if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil {
if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID {
continue
if req.Placement != nil {
if len(req.Placement.Tags) > 0 {
// Tags currently not supported.
resp.Error = NewJSClusterTagsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
cn := req.Placement.Cluster
var peers []string
ourID := cc.meta.ID()
for _, p := range cc.meta.Peers() {
if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil {
if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID {
continue
}
peers = append(peers, p.ID)
}
peers = append(peers, p.ID)
}
if len(peers) == 0 {
resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Randomize and select.
if len(peers) > 1 {
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
}
preferredLeader = peers[0]
}
if len(peers) == 0 {
resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Randomize and select.
if len(peers) > 1 {
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
}
preferredLeader = peers[0]
}

// Call actual stepdown.
Expand Down
7 changes: 4 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2266,7 +2266,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// from underneath the one that is running since it will be the same raft node.
defer func() {
// We might be closing during shutdown, don't pre-emptively stop here since we'll still want to install snapshots.
if !mset.closed.Load() {
if mset != nil && !mset.closed.Load() {
n.Stop()
}
}()
Expand Down Expand Up @@ -2851,7 +2851,7 @@ func (mset *stream) resetClusteredState(err error) bool {
}

if node != nil {
if err == errCatchupTooManyRetries {
if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries {
// Don't delete all state, could've just been temporarily unable to reach the leader.
node.Stop()
} else {
Expand Down Expand Up @@ -8186,6 +8186,7 @@ var (
errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away.
errCatchupBadMsg = errors.New("bad catchup msg")
errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg")
errCatchupAbortedNoLeader = errors.New("catchup aborted, no leader")
errCatchupTooManyRetries = errors.New("catchup failed, too many retries")
)

Expand Down Expand Up @@ -8289,7 +8290,7 @@ RETRY:
releaseSyncOutSem()

if n.GroupLeader() == _EMPTY_ {
return fmt.Errorf("catchup for stream '%s > %s' aborted, no leader", mset.account(), mset.name())
return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name())
}

// If we have a sub clear that here.
Expand Down
58 changes: 58 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6440,6 +6440,64 @@ func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) {
require_NotEqual(t, ml, c.leader())
}

func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

sub1, err := js.PullSubscribe("foo.*", "c1", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
require_NoError(t, err)

sub2, err := js.PullSubscribe("foo.*", "c2", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
require_NoError(t, err)

js.Publish("foo.bar", []byte("HELLO"))

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 1)

msgs, err := sub1.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

msgs, err = sub2.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

// Wait for redelivery to both consumers which will do nothing.
time.Sleep(250 * time.Millisecond)

// Now check that stream and consumer infos are correct.
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
// Messages that are skipped due to max deliveries should NOT remove messages.
require_Equal(t, si.State.Msgs, 1)
require_Equal(t, si.State.Consumers, 2)

for _, cname := range []string{"c1", "c2"} {
ci, err := js.ConsumerInfo("TEST", cname)
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.NumRedelivered, 0)
require_Equal(t, ci.NumPending, 0)
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
Loading