Skip to content

Commit

Permalink
Cherry-picks for 2.10.21-RC.4 (#5928)
Browse files Browse the repository at this point in the history
Includes:

- #5925
- #5926
- #5927

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Sep 25, 2024
2 parents 58fba00 + 098b4f8 commit b39694d
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 8 deletions.
7 changes: 4 additions & 3 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (

// FIXME(dlc) - make configurable.
var eventsHBInterval = 30 * time.Second
var statsHBInterval = 10 * time.Second

// Default minimum wait time for sending statsz
const defaultStatszRateLimit = 1 * time.Second
Expand Down Expand Up @@ -943,9 +944,9 @@ func (s *Server) sendStatsz(subj string) {
Peer: getHash(leader),
Size: mg.ClusterSize(),
}
if ipq := s.jsAPIRoutedReqs; ipq != nil {
jStat.Meta.Pending = ipq.len()
}
}
if ipq := s.jsAPIRoutedReqs; ipq != nil && jStat.Meta != nil {
jStat.Meta.Pending = ipq.len()
}
}
m.Stats.JetStream = jStat
Expand Down
69 changes: 65 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6336,8 +6336,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
}

if isReplicaChange {
isScaleUp := newCfg.Replicas > len(rg.Peers)
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
if isScaleUp {
// Check that we have the allocation available.
if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil {
resp.Error = err
Expand Down Expand Up @@ -6413,22 +6414,82 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su

// Need to remap any consumers.
for _, ca := range osa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy.
// Legacy ephemerals are R=1 but present as R=0, so only auto-remap named consumers, or if we are downsizing the consumer peers.
// If stream is interest or workqueue policy always remaps since they require peer parity with stream.
numPeers := len(ca.Group.Peers)
if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy {
isAutoScale := ca.Config.Replicas == 0 && (ca.Config.Durable != _EMPTY_ || ca.Config.Name != _EMPTY_)
if isAutoScale || numPeers > len(rg.Peers) || cfg.Retention != LimitsPolicy {
cca := ca.copyGroup()
// Adjust preferred as needed.
if numPeers == 1 && len(rg.Peers) > 1 {
if numPeers == 1 && isScaleUp {
cca.Group.Preferred = ca.Group.Peers[0]
} else {
cca.Group.Preferred = _EMPTY_
}
// Assign new peers.
cca.Group.Peers = rg.Peers
// If the replicas was not 0 make sure it matches here.
if cca.Config.Replicas != 0 {
cca.Config.Replicas = len(rg.Peers)
}
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)

} else if !isScaleUp {
// We decided to leave this consumer's peer group alone but we are also scaling down.
// We need to make sure we do not have any peers that are no longer part of the stream.
// Note we handle down scaling of a consumer above if its number of peers were > new stream peers.
var needReplace []string
for _, rp := range ca.Group.Peers {
// Check if we have an orphaned peer now for this consumer.
if !rg.isMember(rp) {
needReplace = append(needReplace, rp)
}
}
if len(needReplace) > 0 {
newPeers := copyStrings(rg.Peers)
rand.Shuffle(len(newPeers), func(i, j int) { newPeers[i], newPeers[j] = newPeers[j], newPeers[i] })
// If we had a small size then the peer set, restrict to the same number.
if lp := len(ca.Group.Peers); lp < len(newPeers) {
newPeers = newPeers[:lp]
}
cca := ca.copyGroup()
// Assign new peers.
cca.Group.Peers = newPeers
// If the replicas was not 0 make sure it matches here.
if cca.Config.Replicas != 0 {
cca.Config.Replicas = len(newPeers)
}
// Check if all peers are invalid. This can happen with R1 under replicated streams that are being scaled down.
if len(needReplace) == len(ca.Group.Peers) {
// We have to transfer state to new peers.
// we will grab our state and attach to the new assignment.
// TODO(dlc) - In practice we would want to make sure the consumer is paused.
// Need to release js lock.
js.mu.Unlock()
if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, acc, osa.Config.Name, ca.Name); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, osa.Config.Name, ca.Name, err)
} else if ci != nil {
cca.State = &ConsumerState{
Delivered: SequencePair{
Consumer: ci.Delivered.Consumer,
Stream: ci.Delivered.Stream,
},
AckFloor: SequencePair{
Consumer: ci.AckFloor.Consumer,
Stream: ci.AckFloor.Stream,
},
}
}
// Re-acquire here.
js.mu.Lock()
}
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)
}
}
}

} else if isMoveRequest {
if len(peerSet) == 0 {
nrg, err := js.createGroupForStream(ci, newCfg)
Expand Down
139 changes: 139 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3560,4 +3560,143 @@ func TestJetStreamPendingRequestsInJsz(t *testing.T) {
require_NoError(t, err)
require_True(t, jsz.Meta != nil)
require_NotEqual(t, jsz.Meta.Pending, 0)

snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

ch := make(chan *nats.Msg, 1)
ssub, err := snc.ChanSubscribe(fmt.Sprintf(serverStatsSubj, metaleader.ID()), ch)
require_NoError(t, err)
require_NoError(t, ssub.AutoUnsubscribe(1))

msg = require_ChanRead(t, ch, time.Second*5)
var m ServerStatsMsg
require_NoError(t, json.Unmarshal(msg.Data, &m))
require_True(t, m.Stats.JetStream != nil)
require_NotEqual(t, m.Stats.JetStream.Meta.Pending, 0)
}

func TestJetStreamConsumerReplicasAfterScale(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 5,
})
require_NoError(t, err)

// Put some messages in to test consumer state transfer.
for i := 0; i < 100; i++ {
js.PublishAsync("foo", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Create four different consumers.
// Normal where we inherit replicas from parent.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 4)

// Ephemeral
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0) // Legacy ephemeral is 0 here too.
require_Equal(t, len(ci.Cluster.Replicas), 0)
eName := ci.Name

// R1
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "r1",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 1)
require_Equal(t, len(ci.Cluster.Replicas), 0)

// R3
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "r3",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 3)
require_Equal(t, len(ci.Cluster.Replicas), 2)

// Now create some state on r1 consumer.
sub, err := js.PullSubscribe("foo", "r1")
require_NoError(t, err)

fetch := rand.Intn(99) + 1 // Needs to be at least 1.
msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), fetch)
ack := rand.Intn(fetch)
for i := 0; i <= ack; i++ {
msgs[i].AckSync()
}
r1ci, err := js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)
r1ci.Delivered.Last, r1ci.AckFloor.Last = nil, nil

// Now scale stream to R3.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

c.waitOnStreamLeader(globalAccountName, "TEST")

// Now check each.
c.waitOnConsumerLeader(globalAccountName, "TEST", "dur")
ci, err = js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 2)

c.waitOnConsumerLeader(globalAccountName, "TEST", eName)
ci, err = js.ConsumerInfo("TEST", eName)
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 0)

c.waitOnConsumerLeader(globalAccountName, "TEST", "r1")
ci, err = js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 1)
require_Equal(t, len(ci.Cluster.Replicas), 0)
// Now check that state transferred correctly.
ci.Delivered.Last, ci.AckFloor.Last = nil, nil
if ci.Delivered != r1ci.Delivered {
t.Fatalf("Delivered state for R1 incorrect, wanted %+v got %+v",
r1ci.Delivered, ci.Delivered)
}
if ci.AckFloor != r1ci.AckFloor {
t.Fatalf("AckFloor state for R1 incorrect, wanted %+v got %+v",
r1ci.AckFloor, ci.AckFloor)
}

c.waitOnConsumerLeader(globalAccountName, "TEST", "r3")
ci, err = js.ConsumerInfo("TEST", "r3")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 3)
require_Equal(t, len(ci.Cluster.Replicas), 2)
}
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,7 @@ func (s *Server) setSystemAccount(acc *Account) error {
recvqp: newIPQueue[*inSysMsg](s, "System recvQ Pings"),
resetCh: make(chan struct{}),
sq: s.newSendQ(),
statsz: eventsHBInterval,
statsz: statsHBInterval,
orphMax: 5 * eventsHBInterval,
chkOrph: 3 * eventsHBInterval,
}
Expand Down

0 comments on commit b39694d

Please sign in to comment.