diff --git a/server/jetstream.go b/server/jetstream.go index d146186efc8..6b333878ab2 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -71,11 +71,13 @@ type JetStreamAccountLimits struct { } type JetStreamTier struct { - Memory uint64 `json:"memory"` - Store uint64 `json:"storage"` - Streams int `json:"streams"` - Consumers int `json:"consumers"` - Limits JetStreamAccountLimits `json:"limits"` + Memory uint64 `json:"memory"` + Store uint64 `json:"storage"` + ReservedMemory uint64 `json:"reserved_memory"` + ReservedStore uint64 `json:"reserved_storage"` + Streams int `json:"streams"` + Consumers int `json:"consumers"` + Limits JetStreamAccountLimits `json:"limits"` } // JetStreamAccountStats returns current statistics about the account's JetStream usage. @@ -1580,6 +1582,40 @@ func diffCheckedLimits(a, b map[string]JetStreamAccountLimits) map[string]JetStr return diff } +// Return reserved bytes for memory and store for this account on this server. +// Lock should be held. +func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) { + for _, mset := range jsa.streams { + cfg := &mset.cfg + if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 { + switch cfg.Storage { + case FileStorage: + store += uint64(cfg.MaxBytes) + case MemoryStorage: + mem += uint64(cfg.MaxBytes) + } + } + } + return mem, store +} + +// Return reserved bytes for memory and store for this account in clustered mode. +// js lock should be held. +func reservedStorage(sas map[string]*streamAssignment, tier string) (mem, store uint64) { + for _, sa := range sas { + cfg := sa.Config + if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 { + switch cfg.Storage { + case FileStorage: + store += uint64(cfg.MaxBytes) + case MemoryStorage: + mem += uint64(cfg.MaxBytes) + } + } + } + return mem, store +} + // JetStreamUsage reports on JetStream usage and limits for an account. func (a *Account) JetStreamUsage() JetStreamAccountStats { a.mu.RLock() @@ -1591,6 +1627,8 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { if jsa != nil { js := jsa.js js.mu.RLock() + cc := js.cluster + singleServer := cc == nil jsa.mu.RLock() jsa.usageMu.RLock() stats.Memory, stats.Store = jsa.storageTotals() @@ -1599,6 +1637,11 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { Total: jsa.apiTotal, Errors: jsa.apiErrors, } + if singleServer { + stats.ReservedMemory, stats.ReservedStore = jsa.reservedStorage(_EMPTY_) + } else { + stats.ReservedMemory, stats.ReservedStore = reservedStorage(cc.streams[aname], _EMPTY_) + } l, defaultTier := jsa.limits[_EMPTY_] if defaultTier { stats.Limits = l @@ -1611,27 +1654,42 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats { // In case this shows an empty stream, that tier will be added when iterating over streams skipped++ } else { - stats.Tiers[t] = JetStreamTier{ + tier := JetStreamTier{ Memory: uint64(total.total.mem), Store: uint64(total.total.store), Limits: jsa.limits[t], } + if singleServer { + tier.ReservedMemory, tier.ReservedStore = jsa.reservedStorage(t) + } else { + tier.ReservedMemory, tier.ReservedStore = reservedStorage(cc.streams[aname], t) + } + stats.Tiers[t] = tier } } if len(accJsLimits) != len(jsa.usage)-skipped { // insert unused limits for t, lim := range accJsLimits { if _, ok := stats.Tiers[t]; !ok { - stats.Tiers[t] = JetStreamTier{Limits: lim} + tier := JetStreamTier{Limits: lim} + if singleServer { + tier.ReservedMemory, tier.ReservedStore = jsa.reservedStorage(t) + } else { + tier.ReservedMemory, tier.ReservedStore = reservedStorage(cc.streams[aname], t) + } + stats.Tiers[t] = tier } } } } jsa.usageMu.RUnlock() - if cc := jsa.js.cluster; cc != nil { + + // Clustered + if cc := js.cluster; cc != nil { sas := cc.streams[aname] if defaultTier { stats.Streams = len(sas) + stats.ReservedMemory, stats.ReservedStore = reservedStorage(sas, _EMPTY_) } for _, sa := range sas { stats.Consumers += len(sa.consumers) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6168a2db870..bc1b5e28ca5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5694,25 +5694,18 @@ func groupName(prefix string, peers []string, storage StorageType) string { // returns stream count for this tier as well as applicable reservation size (not including reservations for cfg) // jetStream read lock should be held func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier string, cfg *StreamConfig) (int, int64) { - numStreams := len(asa) - reservation := int64(0) - if tier == _EMPTY_ { - for _, sa := range asa { - if sa.Config.MaxBytes > 0 && sa.Config.Name != cfg.Name { - if sa.Config.Storage == cfg.Storage { - reservation += (int64(sa.Config.Replicas) * sa.Config.MaxBytes) - } - } - } - } else { - numStreams = 0 - for _, sa := range asa { - if isSameTier(sa.Config, cfg) { - numStreams++ - if sa.Config.MaxBytes > 0 { - if sa.Config.Storage == cfg.Storage && sa.Config.Name != cfg.Name { - reservation += (int64(sa.Config.Replicas) * sa.Config.MaxBytes) - } + var numStreams int + var reservation int64 + for _, sa := range asa { + if tier == _EMPTY_ || isSameTier(sa.Config, cfg) { + numStreams++ + if sa.Config.MaxBytes > 0 && sa.Config.Storage == cfg.Storage && sa.Config.Name != cfg.Name { + // If tier is empty, all storage is flat and we should adjust for replicas. + // Otherwise if tiered, storage replication already taken into consideration. + if tier == _EMPTY_ && cfg.Replicas > 1 { + reservation += sa.Config.MaxBytes * int64(cfg.Replicas) + } else { + reservation += sa.Config.MaxBytes } } } diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index bfcd7e39bcf..7a6f60e6437 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -23,6 +23,7 @@ import ( "encoding/binary" "encoding/hex" "encoding/json" + "errors" "fmt" "math/rand" "os" @@ -3642,14 +3643,15 @@ func TestJetStreamClusterAccountReservations(t *testing.T) { defer nc.Close() accMax := 3 + errResources := errors.New("nats: insufficient storage resources available") + test := func(t *testing.T, replica int) { mb := int64((1+accMax)-replica) * 1024 * 1024 * 1024 // GB, corrected for replication factor _, err := js.AddStream(&nats.StreamConfig{Name: "S1", Subjects: []string{"s1"}, MaxBytes: mb, Replicas: replica}) require_NoError(t, err) _, err = js.AddStream(&nats.StreamConfig{Name: "S2", Subjects: []string{"s2"}, MaxBytes: 1024, Replicas: replica}) - require_Error(t, err) - require_Equal(t, err.Error(), "nats: insufficient storage resources available") + require_Error(t, err, errResources) _, err = js.UpdateStream(&nats.StreamConfig{Name: "S1", Subjects: []string{"s1"}, MaxBytes: mb / 2, Replicas: replica}) require_NoError(t, err) @@ -3658,12 +3660,10 @@ func TestJetStreamClusterAccountReservations(t *testing.T) { require_NoError(t, err) _, err = js.AddStream(&nats.StreamConfig{Name: "S3", Subjects: []string{"s3"}, MaxBytes: 1024, Replicas: replica}) - require_Error(t, err) - require_Equal(t, err.Error(), "nats: insufficient storage resources available") + require_Error(t, err, errResources) _, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Subjects: []string{"s2"}, MaxBytes: mb/2 + 1, Replicas: replica}) - require_Error(t, err) - require_Equal(t, err.Error(), "nats: insufficient storage resources available") + require_Error(t, err, errResources) require_NoError(t, js.DeleteStream("S1")) require_NoError(t, js.DeleteStream("S2")) diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index aee469f98e0..e487053be79 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -576,7 +576,7 @@ func TestJetStreamJWTClusteredTiersChange(t *testing.T) { accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ DiskStorage: 1000, MemoryStorage: 0, Consumer: 1, Streams: 1} accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ - DiskStorage: 1500, MemoryStorage: 0, Consumer: 1, Streams: 1} + DiskStorage: 500, MemoryStorage: 0, Consumer: 1, Streams: 1} accJwt1 := encodeClaim(t, accClaim, aExpPub) accCreds := newUser(t, accKp) start := time.Now() @@ -622,12 +622,11 @@ func TestJetStreamJWTClusteredTiersChange(t *testing.T) { cfg.Replicas = 3 _, err = js.UpdateStream(cfg) - require_Error(t, err) - require_Equal(t, err.Error(), "nats: insufficient storage resources available") + require_Error(t, err, errors.New("nats: insufficient storage resources available")) time.Sleep(time.Second - time.Since(start)) // make sure the time stamp changes accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{ - DiskStorage: 3000, MemoryStorage: 0, Consumer: 1, Streams: 1} + DiskStorage: 1000, MemoryStorage: 0, Consumer: 1, Streams: 1} accJwt2 := encodeClaim(t, accClaim, aExpPub) updateJwt(t, c.randomServer().ClientURL(), sysCreds, accJwt2, 3) @@ -1362,3 +1361,102 @@ func TestJetStreamJWTHAStorageLimitsAndAccounting(t *testing.T) { delta = maxMemStorage - int64(si.State.Bytes) require_True(t, int(delta) < len(msg)) } + +func TestJetStreamJWTHAStorageLimitsOnScaleAndUpdate(t *testing.T) { + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + newUser(t, sysKp) + + maxFileStorage := int64(5 * 1024 * 1024) + maxMemStorage := int64(1 * 1024 * 1024) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.Name = "acc" + accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{DiskStorage: maxFileStorage, MemoryStorage: maxMemStorage} + accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{DiskStorage: maxFileStorage, MemoryStorage: maxMemStorage} + + accJwt := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + tmlp := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + leaf { listen: 127.0.0.1:-1 } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + fmt.Sprintf(` + operator: %s + system_account: %s + resolver = MEMORY + resolver_preload = { + %s : %s + %s : %s + } + `, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + + c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3) + defer c.shutdown() + + nc := natsConnect(t, c.randomServer().ClientURL(), nats.UserCredentials(accCreds)) + defer nc.Close() + + js, err := nc.JetStream() + require_NoError(t, err) + + // Test max bytes first. + _, err = js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3, MaxBytes: maxFileStorage, Subjects: []string{"foo"}}) + require_NoError(t, err) + // Now delete + require_NoError(t, js.DeleteStream("TEST")) + // Now do 5 1MB streams. + for i := 1; i <= 5; i++ { + sname := fmt.Sprintf("TEST%d", i) + _, err = js.AddStream(&nats.StreamConfig{Name: sname, Replicas: 3, MaxBytes: 1 * 1024 * 1024}) + require_NoError(t, err) + } + // Should fail. + _, err = js.AddStream(&nats.StreamConfig{Name: "TEST6", Replicas: 3, MaxBytes: 1 * 1024 * 1024}) + require_Error(t, err, errors.New("insufficient storage resources")) + + // Update Test1 and Test2 to smaller reservations. + _, err = js.UpdateStream(&nats.StreamConfig{Name: "TEST1", Replicas: 3, MaxBytes: 512 * 1024}) + require_NoError(t, err) + _, err = js.UpdateStream(&nats.StreamConfig{Name: "TEST2", Replicas: 3, MaxBytes: 512 * 1024}) + require_NoError(t, err) + // Now make sure TEST6 succeeds. + _, err = js.AddStream(&nats.StreamConfig{Name: "TEST6", Replicas: 3, MaxBytes: 1 * 1024 * 1024}) + require_NoError(t, err) + // Now delete the R3 version. + require_NoError(t, js.DeleteStream("TEST6")) + // Now do R1 version and then we will scale up. + _, err = js.AddStream(&nats.StreamConfig{Name: "TEST6", Replicas: 1, MaxBytes: 1 * 1024 * 1024}) + require_NoError(t, err) + // Now make sure scale up works. + _, err = js.UpdateStream(&nats.StreamConfig{Name: "TEST6", Replicas: 3, MaxBytes: 1 * 1024 * 1024}) + require_NoError(t, err) + // Add in a few more streams to check reserved reporting in account info. + _, err = js.AddStream(&nats.StreamConfig{Name: "TEST7", Replicas: 1, MaxBytes: 2 * 1024 * 1024}) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "TEST8", Replicas: 1, MaxBytes: 256 * 1024, Storage: nats.MemoryStorage}) + require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "TEST9", Replicas: 3, MaxBytes: 22 * 1024, Storage: nats.MemoryStorage}) + require_NoError(t, err) + + // Now make sure we report reserved correctly. + // Do this direct to server since client does not support it yet. + var info JSApiAccountInfoResponse + resp, err := nc.Request("$JS.API.INFO", nil, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(resp.Data, &info)) + stats := info.JetStreamAccountStats + r1, r3 := stats.Tiers["R1"], stats.Tiers["R3"] + + require_Equal(t, r1.ReservedMemory, 256*1024) // TEST8 + require_Equal(t, r1.ReservedStore, 2*1024*1024) // TEST7 + require_Equal(t, r3.ReservedMemory, 22*1024) // TEST9 + require_Equal(t, r3.ReservedStore, 5*1024*1024) // TEST1-TEST6 +} diff --git a/server/stream.go b/server/stream.go index 7317837b328..bf0139b2b1d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -369,9 +369,7 @@ type ddentry struct { } // Replicas Range -const ( - StreamMaxReplicas = 5 -) +const StreamMaxReplicas = 5 // AddStream adds a stream for the given account. func (a *Account) addStream(config *StreamConfig) (*stream, error) { @@ -1255,6 +1253,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } return exists, cfg } + hasStream := func(streamName string) (bool, int32, []string) { exists, cfg := getStream(streamName) return exists, cfg.MaxMsgSize, cfg.Subjects @@ -1633,13 +1632,7 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str // Save the user configured MaxBytes. newMaxBytes := cfg.MaxBytes - maxBytesOffset := int64(0) - if old.MaxBytes > 0 { - if excessRep := cfg.Replicas - old.Replicas; excessRep > 0 { - maxBytesOffset = old.MaxBytes * int64(excessRep) - } - } // We temporarily set cfg.MaxBytes to maxBytesDiff because checkAllLimits // adds cfg.MaxBytes to the current reserved limit and checks if we've gone @@ -1671,7 +1664,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str _, reserved = tieredStreamAndReservationCount(js.cluster.streams[acc.Name], tier, &cfg) } // reservation does not account for this stream, hence add the old value - reserved += int64(old.Replicas) * old.MaxBytes + if tier == _EMPTY_ && old.Replicas > 1 { + reserved += old.MaxBytes * int64(old.Replicas) + } else { + reserved += old.MaxBytes + } if err := js.checkAllLimits(&selected, &cfg, reserved, maxBytesOffset); err != nil { return nil, err }