Skip to content

Commit

Permalink
feat(metarepos): add grpc error codes to the metadata repository service
Browse files Browse the repository at this point in the history
  • Loading branch information
hungryjang committed Jan 30, 2023
1 parent de25143 commit 2903f8c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 56 deletions.
9 changes: 6 additions & 3 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/gogo/status"

"github.com/kakao/varlog/internal/reportcommitter"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/container/set"
Expand Down Expand Up @@ -1350,7 +1353,7 @@ func (mr *RaftMetadataRepository) Unseal(ctx context.Context, lsID types.LogStre
func (mr *RaftMetadataRepository) AddPeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID, url string) error {
if mr.membership.IsMember(nodeID) ||
mr.membership.IsLearner(nodeID) {
return verrors.ErrAlreadyExists
return status.Errorf(codes.AlreadyExists, "node %d, addr:%s", nodeID, url)
}

r := raftpb.ConfChange{
Expand Down Expand Up @@ -1382,7 +1385,7 @@ func (mr *RaftMetadataRepository) AddPeer(ctx context.Context, _ types.ClusterID
func (mr *RaftMetadataRepository) RemovePeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID) error {
if !mr.membership.IsMember(nodeID) &&
!mr.membership.IsLearner(nodeID) {
return verrors.ErrNotExist
return status.Errorf(codes.NotFound, "node %d", nodeID)
}

r := raftpb.ConfChange{
Expand Down Expand Up @@ -1422,7 +1425,7 @@ func (mr *RaftMetadataRepository) registerEndpoint(ctx context.Context) {

func (mr *RaftMetadataRepository) GetClusterInfo(context.Context, types.ClusterID) (*mrpb.ClusterInfo, error) {
if !mr.IsMember() {
return nil, verrors.ErrNotMember
return nil, status.Errorf(codes.PermissionDenied, "not member")
}

peerMap := mr.membership.GetPeers()
Expand Down
3 changes: 1 addition & 2 deletions internal/metarepos/raft_metadata_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/testutil"
"github.com/kakao/varlog/pkg/util/testutil/ports"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/mrpb"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
Expand Down Expand Up @@ -1753,7 +1752,7 @@ func TestMRFailoverJoinNewNode(t *testing.T) {

Convey("Then it should not have member info", func(ctx C) {
_, err := clus.nodes[newNode].GetClusterInfo(context.TODO(), types.ClusterID(0))
So(err, ShouldResemble, verrors.ErrNotMember)
So(status.Code(err), ShouldEqual, codes.PermissionDenied)

cinfo, err := clus.nodes[0].GetClusterInfo(context.TODO(), types.ClusterID(0))
So(err, ShouldBeNil)
Expand Down
72 changes: 36 additions & 36 deletions internal/metarepos/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (ms *MetadataStorage) registerStorageNode(sn *varlogpb.StorageNodeDescripto
old := ms.lookupStorageNode(sn.StorageNodeID)
equal := old.Equal(sn)
if old != nil && !equal {
return verrors.ErrAlreadyExists
return status.Errorf(codes.AlreadyExists, "sn %d, addr:%s", sn.StorageNodeID, old.Address)
}

if equal {
Expand All @@ -307,7 +307,7 @@ func (ms *MetadataStorage) registerStorageNode(sn *varlogpb.StorageNodeDescripto

err := cur.Metadata.UpsertStorageNode(sn)
if err != nil {
return err
return status.Errorf(codes.ResourceExhausted, "upsert sn %d, %s", sn.StorageNodeID, err.Error())
}

ms.metaAppliedIndex++
Expand Down Expand Up @@ -341,11 +341,11 @@ func (ms *MetadataStorage) unregistableStorageNode(snID types.StorageNodeID) boo

func (ms *MetadataStorage) unregisterStorageNode(snID types.StorageNodeID) error {
if ms.lookupStorageNode(snID) == nil {
return verrors.ErrNotExist
return status.Errorf(codes.NotFound, "sn %d", snID)
}

if !ms.unregistableStorageNode(snID) {
return verrors.ErrInvalidArgument
return status.Errorf(codes.InvalidArgument, "unregistable sn %d", snID)
}

pre, cur := ms.getStateMachine()
Expand Down Expand Up @@ -420,24 +420,24 @@ func (ms *MetadataStorage) UnregisterStorageNode(snID types.StorageNodeID, nodeI

func (ms *MetadataStorage) registerLogStream(ls *varlogpb.LogStreamDescriptor) error {
if len(ls.Replicas) == 0 {
return verrors.ErrInvalidArgument
return status.Errorf(codes.InvalidArgument, "empty replicas")
}

topic := ms.lookupTopic(ls.TopicID)
if topic == nil {
return verrors.ErrInvalidArgument
return status.Errorf(codes.InvalidArgument, "empty replicas")
}

for _, r := range ls.Replicas {
if ms.lookupStorageNode(r.StorageNodeID) == nil {
return verrors.ErrInvalidArgument
return status.Errorf(codes.InvalidArgument, "sn %d is not exist", r.StorageNodeID)
}
}

old := ms.lookupLogStream(ls.LogStreamID)
equal := old.Equal(ls)
if old != nil && !equal {
return verrors.ErrAlreadyExists
return status.Errorf(codes.AlreadyExists, "ls %d", ls.LogStreamID)
}

if equal {
Expand All @@ -455,7 +455,7 @@ func (ms *MetadataStorage) registerLogStream(ls *varlogpb.LogStreamDescriptor) e
}

if err := cur.Metadata.UpsertLogStream(ls); err != nil {
return err
return status.Errorf(codes.ResourceExhausted, "upsert ls %d, %s", ls.LogStreamID, err.Error())
}

lm := &mrpb.LogStreamUncommitReports{
Expand All @@ -482,7 +482,7 @@ func (ms *MetadataStorage) registerLogStream(ls *varlogpb.LogStreamDescriptor) e
topic = proto.Clone(topic).(*varlogpb.TopicDescriptor)
topic.InsertLogStream(ls.LogStreamID)
if err := cur.Metadata.UpsertTopic(topic); err != nil {
return err
return status.Errorf(codes.ResourceExhausted, "upsert topic %d, ls %d, %s", topic.TopicID, ls.LogStreamID, err.Error())
}

ms.metaAppliedIndex++
Expand Down Expand Up @@ -511,7 +511,7 @@ func (ms *MetadataStorage) RegisterLogStream(ls *varlogpb.LogStreamDescriptor, n
func (ms *MetadataStorage) unregisterLogStream(lsID types.LogStreamID) error {
ls := ms.lookupLogStream(lsID)
if ls == nil {
return verrors.ErrNotExist
return status.Errorf(codes.NotFound, "ls %d", lsID)
}

pre, cur := ms.getStateMachine()
Expand Down Expand Up @@ -558,18 +558,18 @@ func (ms *MetadataStorage) UnregisterLogStream(lsID types.LogStreamID, nodeIndex

func (ms *MetadataStorage) updateLogStream(ls *varlogpb.LogStreamDescriptor) error {
if len(ls.Replicas) == 0 {
return verrors.ErrInvalidArgument
return status.Errorf(codes.InvalidArgument, "empty replicas")
}

for _, r := range ls.Replicas {
if ms.lookupStorageNode(r.StorageNodeID) == nil {
return verrors.ErrInvalidArgument
return status.Errorf(codes.InvalidArgument, "sn %d is not exist", r.StorageNodeID)
}
}

old := ms.lookupLogStream(ls.LogStreamID)
if old == nil {
return verrors.ErrNotExist
return status.Errorf(codes.NotFound, "ls %d is not exist", ls.LogStreamID)
}

if equal := old.Equal(ls); equal {
Expand All @@ -578,11 +578,11 @@ func (ms *MetadataStorage) updateLogStream(ls *varlogpb.LogStreamDescriptor) err
}

if !old.Status.Sealed() {
return verrors.ErrState
return status.Errorf(codes.FailedPrecondition, "ls %d is not sealed", ls.LogStreamID)
}

if !ms.willBeSealed(ls) {
return verrors.ErrState
return status.Errorf(codes.FailedPrecondition, "ls %d will not be sealed", ls.LogStreamID)
}

_, cur := ms.getStateMachine()
Expand All @@ -592,7 +592,7 @@ func (ms *MetadataStorage) updateLogStream(ls *varlogpb.LogStreamDescriptor) err

err := cur.Metadata.UpsertLogStream(ls)
if err != nil {
return err
return status.Errorf(codes.ResourceExhausted, "upsert ls %d, %s", ls.LogStreamID, err.Error())
}

ms.metaAppliedIndex++
Expand Down Expand Up @@ -659,7 +659,7 @@ func (ms *MetadataStorage) registerTopic(topic *varlogpb.TopicDescriptor) error
old := ms.lookupTopic(topic.TopicID)
equal := old.Equal(topic)
if old != nil && !equal {
return verrors.ErrAlreadyExists
return status.Errorf(codes.AlreadyExists, "topic %d", topic.TopicID)
}

if equal {
Expand Down Expand Up @@ -693,7 +693,7 @@ func (ms *MetadataStorage) registerTopic(topic *varlogpb.TopicDescriptor) error
}

if err := cur.Metadata.UpsertTopic(topic); err != nil {
return err
return status.Errorf(codes.ResourceExhausted, "upsert topic %d, %s", topic.TopicID, err.Error())
}

ms.metaAppliedIndex++
Expand All @@ -715,7 +715,7 @@ func (ms *MetadataStorage) UnregisterTopic(topicID types.TopicID, nodeIndex, req

func (ms *MetadataStorage) unregisterTopic(topicID types.TopicID) error {
if ms.lookupTopic(topicID) == nil {
return verrors.ErrNotExist
return status.Errorf(codes.NotFound, "topic %d", topicID)
}

pre, cur := ms.getStateMachine()
Expand Down Expand Up @@ -801,20 +801,20 @@ func (ms *MetadataStorage) UpdateLogStream(ls *varlogpb.LogStreamDescriptor, nod
return nil
}

func (ms *MetadataStorage) updateLogStreamDescStatus(lsID types.LogStreamID, status varlogpb.LogStreamStatus) error {
func (ms *MetadataStorage) updateLogStreamDescStatus(lsID types.LogStreamID, st varlogpb.LogStreamStatus) error {
ls := ms.lookupLogStream(lsID)
if ls == nil {
return verrors.ErrNotExist
return status.Errorf(codes.NotFound, "ls %d", lsID)
}

ls = proto.Clone(ls).(*varlogpb.LogStreamDescriptor)

if ls.Status == status ||
(ls.Status.Sealed() && status == varlogpb.LogStreamStatusSealing) {
if ls.Status == st ||
(ls.Status.Sealed() && st == varlogpb.LogStreamStatusSealing) {
return nil
}

ls.Status = status
ls.Status = st

_, cur := ms.getStateMachine()

Expand All @@ -827,13 +827,13 @@ func (ms *MetadataStorage) updateLogStreamDescStatus(lsID types.LogStreamID, sta

ms.logger.Info("update log stream status",
zap.Int32("lsid", int32(lsID)),
zap.String("status", status.String()),
zap.String("status", st.String()),
)

return nil
}

func (ms *MetadataStorage) updateUncommitReportStatus(lsID types.LogStreamID, status varlogpb.LogStreamStatus) error {
func (ms *MetadataStorage) updateUncommitReportStatus(lsID types.LogStreamID, st varlogpb.LogStreamStatus) error {
pre, cur := ms.getStateMachine()

lls, ok := cur.LogStream.UncommitReports[lsID]
Expand All @@ -842,21 +842,21 @@ func (ms *MetadataStorage) updateUncommitReportStatus(lsID types.LogStreamID, st
if !ok {
ms.logger.Error("update uncommit report status. not exist lsid",
zap.Int32("lsid", int32(lsID)),
zap.String("status", status.String()),
zap.String("status", st.String()),
)
return verrors.ErrInternal
return status.Errorf(codes.Internal, "update uncommit report status. not exist ls %d, status: %s", lsID, st.String())
}

lls = proto.Clone(o).(*mrpb.LogStreamUncommitReports)
cur.LogStream.UncommitReports[lsID] = lls
}

if lls.Status == status ||
(lls.Status.Sealed() && status == varlogpb.LogStreamStatusSealing) {
if lls.Status == st ||
(lls.Status.Sealed() && st == varlogpb.LogStreamStatusSealing) {
return nil
}

if status.Sealed() {
if st.Sealed() {
min := types.InvalidLLSN
for _, r := range lls.Replicas {
if min.Invalid() || min > r.UncommittedLLSNEnd() {
Expand All @@ -869,12 +869,12 @@ func (ms *MetadataStorage) updateUncommitReportStatus(lsID types.LogStreamID, st
ms.logger.Error("update uncommit report status. replica seal fail",
zap.Int32("lsid", int32(lsID)),
zap.Int32("snid", int32(storageNodeID)),
zap.String("status", status.String()),
zap.String("status", st.String()),
zap.Uint64("min", uint64(min)),
zap.Uint64("begin", uint64(r.UncommittedLLSNOffset)),
zap.Uint64("end", uint64(r.UncommittedLLSNEnd())),
)
return verrors.ErrInternal
return status.Errorf(codes.Internal, "update uncommit report status. replica seal fail. ls %d, status: %s", lsID, st.String())
}
lls.Replicas[storageNodeID] = r
}
Expand All @@ -886,11 +886,11 @@ func (ms *MetadataStorage) updateUncommitReportStatus(lsID types.LogStreamID, st
}
}

lls.Status = status
lls.Status = st

ms.logger.Info("update uncommit report status",
zap.Int32("lsid", int32(lsID)),
zap.String("status", status.String()),
zap.String("status", st.String()),
)

return nil
Expand Down
Loading

0 comments on commit 2903f8c

Please sign in to comment.