Skip to content

Commit

Permalink
feat(metarepos): add an upper limit for the number of log streams in …
Browse files Browse the repository at this point in the history
…a topic

This patch adds an upper limit for the number of log streams in a topic. The threshold can be set by
the `--max-logstreams-count-per-topic` flag to the metadata repository. If it is a negative value,
an unlimited number of log streams can be created per topic. Note that no log streams can be made if
it is zero.

When `internal/metarepos.(*MetadataStorage)` handles `registerLogStream`, it confirms whether the
number of log streams in the topic is greater than or equal to the upper limit.

Later, we can add a new feature, like a topic-specific upper limit of log stream count.

Resolves #297
  • Loading branch information
ijsong committed Dec 29, 2022
1 parent cdec2e7 commit ad2a60f
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 2 deletions.
5 changes: 5 additions & 0 deletions cmd/varlogmr/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ var (
Usage: "Maximum number of topics, infinity if it is negative",
Value: metarepos.DefaultMaxTopicsCount,
}
flagMaxLogStreamsCountPerTopic = &cli.IntFlag{
Name: "max-logstreams-count-per-topic",
Usage: "Maximum number of log streams per topic, infinity if it is negative",
Value: metarepos.DefaultMaxLogStreamsCountPerTopic,
}

flagTelemetryCollectorName = flags.FlagDesc{
Name: "telemetry-collector-name",
Expand Down
2 changes: 2 additions & 0 deletions cmd/varlogmr/metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func start(c *cli.Context) error {
metarepos.WithReportCommitterWriteBufferSize(int(writeBufferSize)),
metarepos.WithPeers(c.StringSlice(flagPeers.Name)...),
metarepos.WithMaxTopicsCount(int32(c.Int(flagMaxTopicsCount.Name))),
metarepos.WithMaxLogStreamsCountPerTopic(int32(c.Int(flagMaxLogStreamsCountPerTopic.Name))),
metarepos.WithTelemetryCollectorName(c.String(flagTelemetryCollectorName.Name)),
metarepos.WithTelemetryCollectorEndpoint(c.String(flagTelemetryCollectorEndpoint.Name)),
metarepos.WithLogger(logger),
Expand Down Expand Up @@ -121,6 +122,7 @@ func initCLI() *cli.App {
flagReportCommitterReadBufferSize.StringFlag(false, units.ToByteSizeString(metarepos.DefaultReportCommitterReadBufferSize)),
flagReportCommitterWriteBufferSize.StringFlag(false, units.ToByteSizeString(metarepos.DefaultReportCommitterWriteBufferSize)),
flagMaxTopicsCount,
flagMaxLogStreamsCountPerTopic,
flagTelemetryCollectorName.StringFlag(false, metarepos.DefaultTelemetryCollectorName),
flagTelemetryCollectorEndpoint.StringFlag(false, metarepos.DefaultTelmetryCollectorEndpoint),
flagLogDir.StringFlag(false, metarepos.DefaultLogDir),
Expand Down
11 changes: 10 additions & 1 deletion internal/metarepos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const (
DefaultReportCommitterReadBufferSize = 32 * 1024 // 32KB
DefaultReportCommitterWriteBufferSize = 32 * 1024 // 32KB

DefaultMaxTopicsCount = -1
DefaultMaxTopicsCount = -1
DefaultMaxLogStreamsCountPerTopic = -1

UnusedRequestIndex uint64 = 0
)
Expand Down Expand Up @@ -81,6 +82,7 @@ type config struct {
reportCommitterReadBufferSize int
reportCommitterWriteBufferSize int
maxTopicsCount int32
maxLogStreamsCountPerTopic int32
telemetryCollectorName string
telemetryCollectorEndpoint string
logger *zap.Logger
Expand Down Expand Up @@ -108,6 +110,7 @@ func newConfig(opts []Option) (config, error) {
reportCommitterReadBufferSize: DefaultReportCommitterReadBufferSize,
reportCommitterWriteBufferSize: DefaultReportCommitterWriteBufferSize,
maxTopicsCount: DefaultMaxTopicsCount,
maxLogStreamsCountPerTopic: DefaultMaxLogStreamsCountPerTopic,
telemetryCollectorName: DefaultTelemetryCollectorName,
telemetryCollectorEndpoint: DefaultTelmetryCollectorEndpoint,
logger: zap.NewNop(),
Expand Down Expand Up @@ -343,6 +346,12 @@ func WithMaxTopicsCount(maxTopicsCount int32) Option {
})
}

func WithMaxLogStreamsCountPerTopic(maxLogStreamsCountPerTopic int32) Option {
return newFuncOption(func(cfg *config) {
cfg.maxLogStreamsCountPerTopic = maxLogStreamsCountPerTopic
})
}

func WithTelemetryCollectorName(telemetryCollectorName string) Option {
return newFuncOption(func(cfg *config) {
cfg.telemetryCollectorName = telemetryCollectorName
Expand Down
2 changes: 2 additions & 0 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository {

mr.storage = NewMetadataStorage(mr.sendAck, cfg.snapCount, mr.logger.Named("storage"))
mr.storage.limits.maxTopicsCount = mr.maxTopicsCount
mr.storage.limits.maxLogStreamsCountPerTopic = mr.maxLogStreamsCountPerTopic

mr.membership = mr.storage

mr.listenNotifyC = make(chan struct{})
Expand Down
53 changes: 53 additions & 0 deletions internal/metarepos/raft_metadata_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,59 @@ func TestMetadataRepository_MaxTopicsCount(t *testing.T) {
})
}

func TestMetadataRepository_MaxLogStreamsCountPerTopic(t *testing.T) {
const (
numNodes = 1
repFactor = 1
increaseUncommit = false

snid = types.StorageNodeID(1)
tpid = types.TopicID(1)
)

Convey("MaxLogStreamsCountPerTopic", t, func(C) {
clus := newMetadataRepoCluster(numNodes, repFactor, increaseUncommit)
Reset(func() {
clus.closeNoErrors(t)
})

So(clus.Start(), ShouldBeNil)
So(testutil.CompareWaitN(10, func() bool {
return clus.healthCheckAll()
}), ShouldBeTrue)

mr := clus.nodes[0]
ctx := context.Background()

err := mr.RegisterStorageNode(ctx, &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
},
})
So(err, ShouldBeNil)

err = mr.RegisterTopic(ctx, tpid)
So(err, ShouldBeNil)

Convey("Limit is zero", func(C) {
mr.storage.limits.maxLogStreamsCountPerTopic = 0
err := mr.RegisterLogStream(ctx, makeLogStream(tpid, types.LogStreamID(1), []types.StorageNodeID{snid}))
So(err, ShouldNotBeNil)
So(status.Code(err), ShouldEqual, codes.ResourceExhausted)
})

Convey("Limit is one", func(C) {
mr.storage.limits.maxLogStreamsCountPerTopic = 1
err := mr.RegisterLogStream(ctx, makeLogStream(tpid, types.LogStreamID(1), []types.StorageNodeID{snid}))
So(err, ShouldBeNil)

err = mr.RegisterLogStream(ctx, makeLogStream(tpid, types.LogStreamID(2), []types.StorageNodeID{snid}))
So(err, ShouldNotBeNil)
So(status.Code(err), ShouldEqual, codes.ResourceExhausted)
})
})
}

func TestMRTopicLastHighWatermark(t *testing.T) {
Convey("given metadata repository with multiple topics", t, func(ctx C) {
nrTopics := 3
Expand Down
8 changes: 7 additions & 1 deletion internal/metarepos/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ type MetadataStorage struct {
running atomicutil.AtomicBool

limits struct {
maxTopicsCount int32
maxTopicsCount int32
maxLogStreamsCountPerTopic int32
}

logger *zap.Logger
Expand All @@ -148,6 +149,7 @@ func NewMetadataStorage(cb func(uint64, uint64, error), snapCount uint64, logger
}
ms.snapCount = snapCount
ms.limits.maxTopicsCount = DefaultMaxTopicsCount
ms.limits.maxLogStreamsCountPerTopic = DefaultMaxLogStreamsCountPerTopic

ms.origStateMachine = &mrpb.MetadataRepositoryDescriptor{}
ms.origStateMachine.Metadata = &varlogpb.MetadataDescriptor{}
Expand Down Expand Up @@ -448,6 +450,10 @@ func (ms *MetadataStorage) registerLogStream(ls *varlogpb.LogStreamDescriptor) e
ms.mtMu.Lock()
defer ms.mtMu.Unlock()

if limit, curr := ms.limits.maxLogStreamsCountPerTopic, len(topic.LogStreams); limit >= 0 && curr >= int(limit) {
return status.Errorf(codes.ResourceExhausted, "too many log streams in topic %d: limits=%d actual=%d", ls.TopicID, limit, curr)
}

if err := cur.Metadata.UpsertLogStream(ls); err != nil {
return err
}
Expand Down
52 changes: 52 additions & 0 deletions internal/metarepos/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,58 @@ func TestStorage_MaxTopicsCount(t *testing.T) {
}
}

func TestStorage_MaxLogStreamsCountPerTopic(t *testing.T) {
const snid = types.StorageNodeID(1)
const tpid = types.TopicID(1)

tcs := []struct {
name string
maxLogStreamsCountPerTopic int32
testf func(t *testing.T, ms *MetadataStorage)
}{
{
name: "LimitZero",
maxLogStreamsCountPerTopic: 0,
testf: func(t *testing.T, ms *MetadataStorage) {
err := ms.registerLogStream(makeLogStream(tpid, types.LogStreamID(1), []types.StorageNodeID{snid}))
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted, status.Code(err))
},
},
{
name: "LimitOne",
maxLogStreamsCountPerTopic: 1,
testf: func(t *testing.T, ms *MetadataStorage) {
err := ms.registerLogStream(makeLogStream(tpid, types.LogStreamID(1), []types.StorageNodeID{snid}))
require.NoError(t, err)

err = ms.registerLogStream(makeLogStream(tpid, types.LogStreamID(2), []types.StorageNodeID{snid}))
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted, status.Code(err))
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ms := NewMetadataStorage(nil, DefaultSnapshotCount, zaptest.NewLogger(t))
ms.limits.maxLogStreamsCountPerTopic = tc.maxLogStreamsCountPerTopic

err := ms.registerStorageNode(&varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
},
})
require.NoError(t, err)

err = ms.registerTopic(&varlogpb.TopicDescriptor{TopicID: tpid})
require.NoError(t, err)

tc.testf(t, ms)
})
}
}

func TestStorageSortedTopicLogStreamIDs(t *testing.T) {
Convey("UncommitReport should be committed", t, func(ctx C) {
/*
Expand Down

0 comments on commit ad2a60f

Please sign in to comment.