Skip to content

Commit

Permalink
feat(admin): defines error codes of several RPCs in the admin server
Browse files Browse the repository at this point in the history
This patch defines the gRPC error codes of various RPCs in the admin server. Previously, we depended
on the `verrors` package to identify the cause of errors. However, the `verrors` package is vague
and an anti-pattern. To handle errors explicitly, we are trying to adopt gRPC error codes.
Although this PR doesn't convert everything, it can be a starting point for our robust error
handling.

Updates #312
  • Loading branch information
ijsong committed Jan 19, 2023
1 parent fe5e676 commit f5ed66f
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 71 deletions.
87 changes: 41 additions & 46 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package admin

import (
"context"
"fmt"
"net"
"sort"
"sync"
Expand All @@ -21,7 +20,6 @@ import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/kakao/varlog/internal/admin/admerrors"
"github.com/kakao/varlog/internal/admin/snwatcher"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/netutil"
Expand Down Expand Up @@ -174,7 +172,7 @@ func (adm *Admin) getStorageNode(ctx context.Context, snid types.StorageNodeID)

md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "get storage node: cluster metadata not fetched")
return nil, status.Errorf(codes.Unavailable, "get storage node: %s", err.Error())
}
snd := md.GetStorageNode(snid)
if snd == nil {
Expand Down Expand Up @@ -220,7 +218,7 @@ func (adm *Admin) listStorageNodes(ctx context.Context) ([]admpb.StorageNodeMeta

md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list storage nodes: cluster metadata not fetched")
return nil, status.Errorf(codes.Unavailable, "list storage nodes: %s", err.Error())
}

lazyInit := false
Expand Down Expand Up @@ -298,24 +296,25 @@ func (adm *Admin) addStorageNode(ctx context.Context, snid types.StorageNodeID,

snmd, err := adm.snmgr.GetMetadataByAddress(ctx, snid, addr)
if err != nil {
return nil, err
return nil, status.Errorf(status.Code(err), "add storage node: %s", err.Error())
}

now := time.Now().UTC()
snd := snmd.ToStorageNodeDescriptor()
snd.Status = varlogpb.StorageNodeStatusRunning
snd.CreateTime = now
if err = adm.mrmgr.RegisterStorageNode(ctx, snd); err != nil {
return nil, err
err = adm.mrmgr.RegisterStorageNode(ctx, snd)
if err != nil {
return nil, status.Errorf(status.Code(err), "add storage node: %s", err.Error())
}

adm.snmgr.AddStorageNode(ctx, snmd.StorageNode.StorageNodeID, addr)
adm.statRepository.Report(ctx, snmd, now)
snm, ok := adm.statRepository.GetStorageNode(snid)
if !ok {
return nil, fmt.Errorf("add storage node: temporal failure")
return nil, status.Error(codes.Unavailable, "add storage node: call again")
}
return snm, err
return snm, nil
}

func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageNodeID) error {
Expand All @@ -324,7 +323,7 @@ func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageN

clusmeta, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return err
return status.Errorf(codes.Unavailable, "unregister storage node: %s", err.Error())
}

if clusmeta.GetStorageNode(snid) == nil {
Expand All @@ -335,13 +334,13 @@ func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageN
for _, lsdesc := range clusmeta.GetLogStreams() {
for _, replica := range lsdesc.GetReplicas() {
if replica.GetStorageNodeID() == snid {
return errors.WithMessagef(admerrors.ErrNotIdleReplicas, "unregister storage node")
return status.Errorf(codes.FailedPrecondition, "unregister storage node: non-idle replicas: %s", replica.String())
}
}
}

if err := adm.mrmgr.UnregisterStorageNode(ctx, snid); err != nil {
return err
return status.Errorf(status.Code(err), "unregister storage node: %s", err.Error())
}

adm.snmgr.RemoveStorageNode(snid)
Expand All @@ -352,11 +351,11 @@ func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageN
func (adm *Admin) getTopic(ctx context.Context, tpid types.TopicID) (*varlogpb.TopicDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, err
return nil, status.Errorf(codes.Unavailable, "get topic: %s", err.Error())
}
td := md.GetTopic(tpid)
if td == nil {
return nil, errors.WithMessagef(admerrors.ErrNoSuchTopic, "get topic %d", int32(tpid))
return nil, status.Errorf(codes.NotFound, "get topic: no such topic %d", tpid)
}
return td, nil
}
Expand All @@ -366,8 +365,8 @@ func (adm *Admin) listTopics(ctx context.Context) ([]varlogpb.TopicDescriptor, e
defer adm.mu.Unlock()

md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil || len(md.Topics) == 0 {
return nil, err
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list topics: %s", err.Error())
}

tds := make([]varlogpb.TopicDescriptor, len(md.Topics))
Expand All @@ -385,7 +384,7 @@ func (adm *Admin) addTopic(ctx context.Context) (*varlogpb.TopicDescriptor, erro
// Note that the metadata repository accepts redundant RegisterTopic
// RPC only if the topic has no log streams.
if err := adm.mrmgr.RegisterTopic(ctx, topicID); err != nil {
return nil, err
return nil, status.Errorf(status.Code(err), "add topic: %s", err.Error())
}

return &varlogpb.TopicDescriptor{TopicID: topicID}, nil
Expand All @@ -397,7 +396,7 @@ func (adm *Admin) unregisterTopic(ctx context.Context, tpid types.TopicID) error

clusmeta, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return err
return status.Errorf(codes.Unavailable, "unregister topic: %s", err.Error())
}

// TODO: Should it returns an error when removing the topic that has already been deleted or does not exist?
Expand All @@ -420,33 +419,33 @@ func (adm *Admin) unregisterTopic(ctx context.Context, tpid types.TopicID) error
func (adm *Admin) getLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (*varlogpb.LogStreamDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, errors.WithMessage(err, "get log stream")
return nil, status.Errorf(codes.Unavailable, "get log stream: %s", err.Error())
}
td := md.GetTopic(tpid)
if td == nil {
return nil, errors.WithMessagef(admerrors.ErrNoSuchTopic, "get log stream: tpid %d", int32(tpid))
return nil, status.Errorf(codes.NotFound, "get log stream: no such topic: tpid %d", tpid)
}
if !td.HasLogStream(lsid) {
return nil, errors.WithMessagef(admerrors.ErrNoSuchLogStream, "get log stream: no log stream in topic: lsid %d", int32(lsid))
return nil, status.Errorf(codes.NotFound, "get log stream: no log stream in the topic: tpid %d, lsid %d", tpid, lsid)
}
lsd := md.GetLogStream(lsid)
if lsd == nil {
return nil, errors.WithMessagef(admerrors.ErrNoSuchLogStream, "get log stream: lsid %d", int32(lsid))
return nil, status.Errorf(codes.NotFound, "get log stream: no log stream: lsid %d", lsid)
}
if lsd.TopicID != tpid {
return nil, fmt.Errorf("get log stream: unexpected topic: expected %d, actual %d", int32(tpid), int32(lsd.TopicID))
return nil, status.Errorf(codes.Internal, "get log stream: unexpected topic: expected %d, actual %d", tpid, lsd.TopicID)
}
return lsd, nil
}

func (adm *Admin) listLogStreams(ctx context.Context, tpid types.TopicID) ([]varlogpb.LogStreamDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, errors.WithMessage(err, "list log streams")
return nil, status.Errorf(codes.Unavailable, "list log streams: %s", err.Error())
}
td := md.GetTopic(tpid)
if td == nil {
return nil, errors.WithMessagef(admerrors.ErrNoSuchTopic, "list log streams: tpid %d", int32(tpid))
return nil, status.Errorf(codes.NotFound, "list log streams: no such topic: tpid %d", tpid)
}
lsds := make([]varlogpb.LogStreamDescriptor, 0, len(td.LogStreams))
for _, lsid := range td.LogStreams {
Expand All @@ -455,7 +454,7 @@ func (adm *Admin) listLogStreams(ctx context.Context, tpid types.TopicID) ([]var
continue
}
if lsd.TopicID != tpid {
return nil, fmt.Errorf("list log streams: unexpected topic: expected %d, actual %d", int32(tpid), int32(lsd.TopicID))
return nil, status.Errorf(codes.Internal, "list log streams: unexpected topic: lsid %d, expected %d, actual %d", lsd.LogStreamID, tpid, lsd.TopicID)
}
lsds = append(lsds, *lsd)
}
Expand Down Expand Up @@ -540,17 +539,6 @@ func (adm *Admin) addLogStreamInternal(ctx context.Context, tpid types.TopicID,

lsid := adm.lsidGen.Generate()

// duplicated by verifyLogStream
/*
if err := clusmeta.MustNotHaveLogStream(logStreamID); err != nil {
if e := adm.lsidGen.Refresh(ctx); e != nil {
err = multierr.Append(err, e)
adm.logger.Panic("could not refresh LogStreamIDGenerator", zap.Error(err))
}
return nil, err
}
*/

lsd := &varlogpb.LogStreamDescriptor{
TopicID: tpid,
LogStreamID: lsid,
Expand All @@ -560,7 +548,7 @@ func (adm *Admin) addLogStreamInternal(ctx context.Context, tpid types.TopicID,

clusmeta, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, err
return nil, status.Errorf(status.Code(err), "add log stream: %s", err.Error())
}
if err := adm.verifyLogStream(clusmeta, lsd); err != nil {
return nil, err
Expand All @@ -569,29 +557,33 @@ func (adm *Admin) addLogStreamInternal(ctx context.Context, tpid types.TopicID,
// TODO: Choose the primary - e.g., shuffle logStreamReplicaMetas
lsd, err = adm.snmgr.AddLogStream(ctx, lsd)
if err != nil {
return nil, err
return nil, status.Errorf(status.Code(err), "add log stream: %s", err.Error())
}

// NB: RegisterLogStream returns nil if the logstream already exists.
return lsd, adm.mrmgr.RegisterLogStream(ctx, lsd)
err = adm.mrmgr.RegisterLogStream(ctx, lsd)
if err != nil {
return nil, status.Errorf(status.Code(err), "add log stream: %s", err.Error())
}
return lsd, nil
}

func (adm *Admin) verifyLogStream(clusmeta *varlogpb.MetadataDescriptor, lsdesc *varlogpb.LogStreamDescriptor) error {
replicas := lsdesc.GetReplicas()
// the number of logstream replica
if uint(len(replicas)) != adm.replicationFactor {
return errors.Errorf("invalid number of log stream replicas: %d", len(replicas))
return status.Errorf(codes.FailedPrecondition, "add log stream: invalid number of log stream replicas: expected %d, actual %d", adm.replicationFactor, len(replicas))
}
// storagenode existence
for _, replica := range replicas {
if _, err := clusmeta.MustHaveStorageNode(replica.GetStorageNodeID()); err != nil {
return err
return status.Errorf(codes.FailedPrecondition, "add log stream: no such storage node: snid %d", replica.StorageNodeID)
}
}
// logstream existence
if err := clusmeta.MustNotHaveLogStream(lsdesc.GetLogStreamID()); err != nil {
_ = adm.lsidGen.Refresh(context.TODO())
return err
return status.Errorf(codes.FailedPrecondition, "add log stream: duplicated log stream id: %d", lsdesc.LogStreamID)
}
return nil
}
Expand Down Expand Up @@ -783,14 +775,17 @@ func (adm *Admin) removeLogStreamReplica(ctx context.Context, snid types.Storage

clusmeta, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return err
return status.Errorf(codes.Unavailable, "remove log stream replica: %s", err.Error())
}

if err := adm.removableLogStreamReplica(clusmeta, snid, lsid); err != nil {
return err
}

return adm.snmgr.RemoveLogStreamReplica(ctx, snid, tpid, lsid)
if err := adm.snmgr.RemoveLogStreamReplica(ctx, snid, tpid, lsid); err != nil {
return status.Errorf(status.Code(err), "remove log stream replica: %s", err.Error())
}
return nil
}

func (adm *Admin) removableLogStreamReplica(clusmeta *varlogpb.MetadataDescriptor, snid types.StorageNodeID, lsid types.LogStreamID) error {
Expand All @@ -803,7 +798,7 @@ func (adm *Admin) removableLogStreamReplica(clusmeta *varlogpb.MetadataDescripto
replicas := lsdesc.GetReplicas()
for _, replica := range replicas {
if replica.GetStorageNodeID() == snid {
return errors.Wrap(verrors.ErrState, "running log stream is not removable")
return status.Errorf(codes.FailedPrecondition, "remove log stream replica: appendable log stream")
}
}
return nil
Expand Down
Loading

0 comments on commit f5ed66f

Please sign in to comment.