Skip to content

Commit

Permalink
Fixes for JetStream accounting for replicas. Also added in reporting …
Browse files Browse the repository at this point in the history
…for account info for reserved storage.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Dec 24, 2023
1 parent 6f831d3 commit 7260c63
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 49 deletions.
74 changes: 66 additions & 8 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ type JetStreamAccountLimits struct {
}

type JetStreamTier struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Limits JetStreamAccountLimits `json:"limits"`
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
ReservedMemory uint64 `json:"reserved_memory"`
ReservedStore uint64 `json:"reserved_storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Limits JetStreamAccountLimits `json:"limits"`
}

// JetStreamAccountStats returns current statistics about the account's JetStream usage.
Expand Down Expand Up @@ -1580,6 +1582,40 @@ func diffCheckedLimits(a, b map[string]JetStreamAccountLimits) map[string]JetStr
return diff
}

// Return reserved bytes for memory and store for this account on this server.
// Lock should be held.
func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
for _, mset := range jsa.streams {
cfg := &mset.cfg
if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 {
switch cfg.Storage {
case FileStorage:
store += uint64(cfg.MaxBytes)
case MemoryStorage:
mem += uint64(cfg.MaxBytes)
}
}
}
return mem, store
}

// Return reserved bytes for memory and store for this account in clustered mode.
// js lock should be held.
func reservedStorage(sas map[string]*streamAssignment, tier string) (mem, store uint64) {
for _, sa := range sas {
cfg := sa.Config
if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 {
switch cfg.Storage {
case FileStorage:
store += uint64(cfg.MaxBytes)
case MemoryStorage:
mem += uint64(cfg.MaxBytes)
}
}
}
return mem, store
}

// JetStreamUsage reports on JetStream usage and limits for an account.
func (a *Account) JetStreamUsage() JetStreamAccountStats {
a.mu.RLock()
Expand All @@ -1591,6 +1627,8 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats {
if jsa != nil {
js := jsa.js
js.mu.RLock()
cc := js.cluster
singleServer := cc == nil
jsa.mu.RLock()
jsa.usageMu.RLock()
stats.Memory, stats.Store = jsa.storageTotals()
Expand All @@ -1599,6 +1637,11 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats {
Total: jsa.apiTotal,
Errors: jsa.apiErrors,
}
if singleServer {
stats.ReservedMemory, stats.ReservedStore = jsa.reservedStorage(_EMPTY_)
} else {
stats.ReservedMemory, stats.ReservedStore = reservedStorage(cc.streams[aname], _EMPTY_)
}
l, defaultTier := jsa.limits[_EMPTY_]
if defaultTier {
stats.Limits = l
Expand All @@ -1611,27 +1654,42 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats {
// In case this shows an empty stream, that tier will be added when iterating over streams
skipped++
} else {
stats.Tiers[t] = JetStreamTier{
tier := JetStreamTier{
Memory: uint64(total.total.mem),
Store: uint64(total.total.store),
Limits: jsa.limits[t],
}
if singleServer {
tier.ReservedMemory, tier.ReservedStore = jsa.reservedStorage(t)
} else {
tier.ReservedMemory, tier.ReservedStore = reservedStorage(cc.streams[aname], t)
}
stats.Tiers[t] = tier
}
}
if len(accJsLimits) != len(jsa.usage)-skipped {
// insert unused limits
for t, lim := range accJsLimits {
if _, ok := stats.Tiers[t]; !ok {
stats.Tiers[t] = JetStreamTier{Limits: lim}
tier := JetStreamTier{Limits: lim}
if singleServer {
tier.ReservedMemory, tier.ReservedStore = jsa.reservedStorage(t)
} else {
tier.ReservedMemory, tier.ReservedStore = reservedStorage(cc.streams[aname], t)
}
stats.Tiers[t] = tier
}
}
}
}
jsa.usageMu.RUnlock()
if cc := jsa.js.cluster; cc != nil {

// Clustered
if cc := js.cluster; cc != nil {
sas := cc.streams[aname]
if defaultTier {
stats.Streams = len(sas)
stats.ReservedMemory, stats.ReservedStore = reservedStorage(sas, _EMPTY_)
}
for _, sa := range sas {
stats.Consumers += len(sa.consumers)
Expand Down
31 changes: 12 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5694,25 +5694,18 @@ func groupName(prefix string, peers []string, storage StorageType) string {
// returns stream count for this tier as well as applicable reservation size (not including reservations for cfg)
// jetStream read lock should be held
func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier string, cfg *StreamConfig) (int, int64) {
numStreams := len(asa)
reservation := int64(0)
if tier == _EMPTY_ {
for _, sa := range asa {
if sa.Config.MaxBytes > 0 && sa.Config.Name != cfg.Name {
if sa.Config.Storage == cfg.Storage {
reservation += (int64(sa.Config.Replicas) * sa.Config.MaxBytes)
}
}
}
} else {
numStreams = 0
for _, sa := range asa {
if isSameTier(sa.Config, cfg) {
numStreams++
if sa.Config.MaxBytes > 0 {
if sa.Config.Storage == cfg.Storage && sa.Config.Name != cfg.Name {
reservation += (int64(sa.Config.Replicas) * sa.Config.MaxBytes)
}
var numStreams int
var reservation int64
for _, sa := range asa {
if tier == _EMPTY_ || isSameTier(sa.Config, cfg) {
numStreams++
if sa.Config.MaxBytes > 0 && sa.Config.Storage == cfg.Storage && sa.Config.Name != cfg.Name {
// If tier is empty, all storage is flat and we should adjust for replicas.
// Otherwise if tiered, storage replication already taken into consideration.
if tier == _EMPTY_ && cfg.Replicas > 1 {
reservation += sa.Config.MaxBytes * int64(cfg.Replicas)
} else {
reservation += sa.Config.MaxBytes
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -23,6 +23,7 @@ import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
Expand Down Expand Up @@ -3642,14 +3643,15 @@ func TestJetStreamClusterAccountReservations(t *testing.T) {
defer nc.Close()
accMax := 3

errResources := errors.New("nats: insufficient storage resources available")

test := func(t *testing.T, replica int) {
mb := int64((1+accMax)-replica) * 1024 * 1024 * 1024 // GB, corrected for replication factor
_, err := js.AddStream(&nats.StreamConfig{Name: "S1", Subjects: []string{"s1"}, MaxBytes: mb, Replicas: replica})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{Name: "S2", Subjects: []string{"s2"}, MaxBytes: 1024, Replicas: replica})
require_Error(t, err)
require_Equal(t, err.Error(), "nats: insufficient storage resources available")
require_Error(t, err, errResources)

_, err = js.UpdateStream(&nats.StreamConfig{Name: "S1", Subjects: []string{"s1"}, MaxBytes: mb / 2, Replicas: replica})
require_NoError(t, err)
Expand All @@ -3658,12 +3660,10 @@ func TestJetStreamClusterAccountReservations(t *testing.T) {
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{Name: "S3", Subjects: []string{"s3"}, MaxBytes: 1024, Replicas: replica})
require_Error(t, err)
require_Equal(t, err.Error(), "nats: insufficient storage resources available")
require_Error(t, err, errResources)

_, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Subjects: []string{"s2"}, MaxBytes: mb/2 + 1, Replicas: replica})
require_Error(t, err)
require_Equal(t, err.Error(), "nats: insufficient storage resources available")
require_Error(t, err, errResources)

require_NoError(t, js.DeleteStream("S1"))
require_NoError(t, js.DeleteStream("S2"))
Expand Down
108 changes: 103 additions & 5 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestJetStreamJWTClusteredTiersChange(t *testing.T) {
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
DiskStorage: 1000, MemoryStorage: 0, Consumer: 1, Streams: 1}
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
DiskStorage: 1500, MemoryStorage: 0, Consumer: 1, Streams: 1}
DiskStorage: 500, MemoryStorage: 0, Consumer: 1, Streams: 1}
accJwt1 := encodeClaim(t, accClaim, aExpPub)
accCreds := newUser(t, accKp)
start := time.Now()
Expand Down Expand Up @@ -622,12 +622,11 @@ func TestJetStreamJWTClusteredTiersChange(t *testing.T) {

cfg.Replicas = 3
_, err = js.UpdateStream(cfg)
require_Error(t, err)
require_Equal(t, err.Error(), "nats: insufficient storage resources available")
require_Error(t, err, errors.New("nats: insufficient storage resources available"))

time.Sleep(time.Second - time.Since(start)) // make sure the time stamp changes
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
DiskStorage: 3000, MemoryStorage: 0, Consumer: 1, Streams: 1}
DiskStorage: 1000, MemoryStorage: 0, Consumer: 1, Streams: 1}
accJwt2 := encodeClaim(t, accClaim, aExpPub)

updateJwt(t, c.randomServer().ClientURL(), sysCreds, accJwt2, 3)
Expand Down Expand Up @@ -1362,3 +1361,102 @@ func TestJetStreamJWTHAStorageLimitsAndAccounting(t *testing.T) {
delta = maxMemStorage - int64(si.State.Bytes)
require_True(t, int(delta) < len(msg))
}

func TestJetStreamJWTHAStorageLimitsOnScaleAndUpdate(t *testing.T) {
sysKp, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
newUser(t, sysKp)

maxFileStorage := int64(5 * 1024 * 1024)
maxMemStorage := int64(1 * 1024 * 1024)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Name = "acc"
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{DiskStorage: maxFileStorage, MemoryStorage: maxMemStorage}
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{DiskStorage: maxFileStorage, MemoryStorage: maxMemStorage}

accJwt := encodeClaim(t, accClaim, aExpPub)
accCreds := newUser(t, accKp)
tmlp := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf { listen: 127.0.0.1:-1 }
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
` + fmt.Sprintf(`
operator: %s
system_account: %s
resolver = MEMORY
resolver_preload = {
%s : %s
%s : %s
}
`, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)

c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3)
defer c.shutdown()

nc := natsConnect(t, c.randomServer().ClientURL(), nats.UserCredentials(accCreds))
defer nc.Close()

js, err := nc.JetStream()
require_NoError(t, err)

// Test max bytes first.
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3, MaxBytes: maxFileStorage, Subjects: []string{"foo"}})
require_NoError(t, err)
// Now delete
require_NoError(t, js.DeleteStream("TEST"))
// Now do 5 1MB streams.
for i := 1; i <= 5; i++ {
sname := fmt.Sprintf("TEST%d", i)
_, err = js.AddStream(&nats.StreamConfig{Name: sname, Replicas: 3, MaxBytes: 1 * 1024 * 1024})
require_NoError(t, err)
}
// Should fail.
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST6", Replicas: 3, MaxBytes: 1 * 1024 * 1024})
require_Error(t, err, errors.New("insufficient storage resources"))

// Update Test1 and Test2 to smaller reservations.
_, err = js.UpdateStream(&nats.StreamConfig{Name: "TEST1", Replicas: 3, MaxBytes: 512 * 1024})
require_NoError(t, err)
_, err = js.UpdateStream(&nats.StreamConfig{Name: "TEST2", Replicas: 3, MaxBytes: 512 * 1024})
require_NoError(t, err)
// Now make sure TEST6 succeeds.
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST6", Replicas: 3, MaxBytes: 1 * 1024 * 1024})
require_NoError(t, err)
// Now delete the R3 version.
require_NoError(t, js.DeleteStream("TEST6"))
// Now do R1 version and then we will scale up.
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST6", Replicas: 1, MaxBytes: 1 * 1024 * 1024})
require_NoError(t, err)
// Now make sure scale up works.
_, err = js.UpdateStream(&nats.StreamConfig{Name: "TEST6", Replicas: 3, MaxBytes: 1 * 1024 * 1024})
require_NoError(t, err)
// Add in a few more streams to check reserved reporting in account info.
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST7", Replicas: 1, MaxBytes: 2 * 1024 * 1024})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST8", Replicas: 1, MaxBytes: 256 * 1024, Storage: nats.MemoryStorage})
require_NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{Name: "TEST9", Replicas: 3, MaxBytes: 22 * 1024, Storage: nats.MemoryStorage})
require_NoError(t, err)

// Now make sure we report reserved correctly.
// Do this direct to server since client does not support it yet.
var info JSApiAccountInfoResponse
resp, err := nc.Request("$JS.API.INFO", nil, time.Second)
require_NoError(t, err)
require_NoError(t, json.Unmarshal(resp.Data, &info))
stats := info.JetStreamAccountStats
r1, r3 := stats.Tiers["R1"], stats.Tiers["R3"]

require_Equal(t, r1.ReservedMemory, 256*1024) // TEST8
require_Equal(t, r1.ReservedStore, 2*1024*1024) // TEST7
require_Equal(t, r3.ReservedMemory, 22*1024) // TEST9
require_Equal(t, r3.ReservedStore, 5*1024*1024) // TEST1-TEST6
}
17 changes: 7 additions & 10 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,7 @@ type ddentry struct {
}

// Replicas Range
const (
StreamMaxReplicas = 5
)
const StreamMaxReplicas = 5

// AddStream adds a stream for the given account.
func (a *Account) addStream(config *StreamConfig) (*stream, error) {
Expand Down Expand Up @@ -1255,6 +1253,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}
return exists, cfg
}

hasStream := func(streamName string) (bool, int32, []string) {
exists, cfg := getStream(streamName)
return exists, cfg.MaxMsgSize, cfg.Subjects
Expand Down Expand Up @@ -1633,13 +1632,7 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str

// Save the user configured MaxBytes.
newMaxBytes := cfg.MaxBytes

maxBytesOffset := int64(0)
if old.MaxBytes > 0 {
if excessRep := cfg.Replicas - old.Replicas; excessRep > 0 {
maxBytesOffset = old.MaxBytes * int64(excessRep)
}
}

// We temporarily set cfg.MaxBytes to maxBytesDiff because checkAllLimits
// adds cfg.MaxBytes to the current reserved limit and checks if we've gone
Expand Down Expand Up @@ -1671,7 +1664,11 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
_, reserved = tieredStreamAndReservationCount(js.cluster.streams[acc.Name], tier, &cfg)
}
// reservation does not account for this stream, hence add the old value
reserved += int64(old.Replicas) * old.MaxBytes
if tier == _EMPTY_ && old.Replicas > 1 {
reserved += old.MaxBytes * int64(old.Replicas)
} else {
reserved += old.MaxBytes
}
if err := js.checkAllLimits(&selected, &cfg, reserved, maxBytesOffset); err != nil {
return nil, err
}
Expand Down

0 comments on commit 7260c63

Please sign in to comment.