Skip to content

Commit

Permalink
feat(admin): return ResourceExhausted if the log streams count overflows
Browse files Browse the repository at this point in the history
The admin server and client respect the error code ResourceExhausted if the log stream count reaches
the upper limit.
  • Loading branch information
ijsong committed Jan 3, 2023
1 parent 270d3b7 commit b15f29e
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 5 deletions.
6 changes: 3 additions & 3 deletions internal/admin/mrmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varl
}

if err := cli.RegisterLogStream(ctx, logStreamDesc); err != nil {
return multierr.Append(err, cli.Close())
_ = cli.Close()
return err
}

return err
return nil
}

func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) error {
Expand Down
7 changes: 6 additions & 1 deletion internal/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

pbtypes "github.com/gogo/protobuf/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/verrors"
Expand Down Expand Up @@ -105,7 +106,11 @@ func (s *server) DescribeTopic(ctx context.Context, req *vmspb.DescribeTopicRequ
func (s *server) AddLogStream(ctx context.Context, req *vmspb.AddLogStreamRequest) (*vmspb.AddLogStreamResponse, error) {
logStreamDesc, err := s.admin.addLogStream(ctx, req.GetTopicID(), req.GetReplicas())
if err != nil {
return nil, verrors.ToStatusErrorWithCode(err, codes.Unavailable)
code := status.Code(err)
if code != codes.ResourceExhausted {
code = codes.Unavailable
}
return nil, verrors.ToStatusErrorWithCode(err, code)
}
return &vmspb.AddLogStreamResponse{LogStream: logStreamDesc}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/mrc/metadata_repository_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ func (c *metadataRepositoryClient) RegisterLogStream(ctx context.Context, ls *va
LogStream: ls,
}
_, err := c.client.RegisterLogStream(ctx, req)
return verrors.FromStatusError(errors.WithStack(err))
if err != nil {
if code := status.Code(err); code == codes.ResourceExhausted {
return err
}
return verrors.FromStatusError(errors.WithStack(err))
}
return nil
}

func (c *metadataRepositoryClient) UnregisterLogStream(ctx context.Context, lsID types.LogStreamID) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/varlog/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type Admin interface {
// Deprecated: Use ListLogStreams.
DescribeTopic(ctx context.Context, topicID types.TopicID, opts ...AdminCallOption) (*vmspb.DescribeTopicResponse, error)
// AddLogStream adds a new log stream to the topic tpid.
// It returns the error code ResourceExhausted if the number of log streams
// is reached the upper limit.
//
// The admin server chooses proper replicas if the argument replicas are empty.
// Otherwise, if the argument replicas are defined, the admin server
Expand Down
6 changes: 6 additions & 0 deletions proto/vmspb/admin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions proto/vmspb/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ service ClusterManager {

rpc GetLogStream(GetLogStreamRequest) returns (GetLogStreamResponse) {}
rpc ListLogStreams(ListLogStreamsRequest) returns (ListLogStreamsResponse) {}
// AddLogStream adds a new log stream to the cluster.
// The error code ResourceExhausted is returned if the number of log streams
// is reached the upper limit.
rpc AddLogStream(AddLogStreamRequest) returns (AddLogStreamResponse) {}
// UpdateLogStream changes the configuration of replicas in a log stream.
// Its codes are defines as followings:
Expand Down
57 changes: 57 additions & 0 deletions tests/it/management/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/internal/admin"
"github.com/kakao/varlog/internal/admin/snwatcher"
"github.com/kakao/varlog/internal/metarepos"
"github.com/kakao/varlog/internal/storagenode"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/testutil"
"github.com/kakao/varlog/pkg/varlog"
Expand Down Expand Up @@ -135,6 +138,60 @@ func TestUnregisterLogStream(t *testing.T) {
require.NoError(t, err)
}

func TestAddLogStream_LogStreamsCount(t *testing.T) {
const (
replicationFactor = 1
storageNodeCount = 1
topicCount = 1
maxLogStreamsCount = 2
)

tcs := []struct {
name string
mrOpts []metarepos.Option
snOpts []storagenode.Option
}{
{
name: "StorageNodeLimit",
snOpts: []storagenode.Option{
storagenode.WithMaxLogStreamReplicasCount(maxLogStreamsCount),
},
},
{
name: "MetadataRepositoryLimit",
mrOpts: []metarepos.Option{
metarepos.WithMaxLogStreamsCountPerTopic(maxLogStreamsCount),
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
clus := it.NewVarlogCluster(t,
it.WithReplicationFactor(replicationFactor),
it.WithNumberOfStorageNodes(storageNodeCount),
it.WithNumberOfTopics(topicCount),
it.WithCustomizedMetadataRepositoryOptions(tc.mrOpts...),
it.WithCustomizedStorageNodeOptions(tc.snOpts...),
)
defer clus.Close(t)

ctx := context.Background()
tpid := clus.TopicIDs()[0]
mc := clus.GetVMSClient(t)

for i := 0; i < maxLogStreamsCount; i++ {
_, err := mc.AddLogStream(ctx, tpid, nil)
require.NoError(t, err)
}

_, err := mc.AddLogStream(ctx, tpid, nil)
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted, status.Code(err))
})
}
}

func TestAddLogStreamWithNotExistedNode(t *testing.T) {
clus := it.NewVarlogCluster(t, it.WithNumberOfTopics(1))
defer clus.Close(t)
Expand Down

0 comments on commit b15f29e

Please sign in to comment.