Skip to content

Commit

Permalink
fix(admin): contains all storage nodes registered to the cluster
Browse files Browse the repository at this point in the history
RPCs GetStorageNode and ListStorageNodes should contain all storage nodes registered to the cluster
even if the admin server is just restarted.
Previously, if the admin server was restarted the stats repository of the admin was flushed, thus,
it had a problem that the RPCs do not return all storage nodes after restarting.
This patch fixes the problem by using the cluster metadata fetched from the metadata repository. If
the metadata repository is not reachable transiently, the RPCs return an error with the status code
`Unavailable`.

Resolves #106
  • Loading branch information
ijsong committed Sep 4, 2022
1 parent b5052ae commit 28c02ab
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 30 deletions.
52 changes: 47 additions & 5 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpcctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"

Expand Down Expand Up @@ -163,17 +165,57 @@ func (adm *Admin) Metadata(ctx context.Context) (*varlogpb.MetadataDescriptor, e
}

func (adm *Admin) getStorageNode(ctx context.Context, snid types.StorageNodeID) (*vmspb.StorageNodeMetadata, error) {
snm, ok := adm.statRepository.GetStorageNode(snid)
if !ok {
return nil, errors.WithMessagef(admerrors.ErrNoSuchStorageNode, "get storage node %d", int32(snid))
adm.mu.RLock()
defer adm.mu.RUnlock()

md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "get storage node: cluster metadata not fetched")
}
snd := md.GetStorageNode(snid)
if snd == nil {
return nil, status.Errorf(codes.NotFound, "get storage node: %d", int32(snid))
}
if snm, ok := adm.statRepository.GetStorageNode(snid); ok {
return snm, nil
}
return snm, nil
return &vmspb.StorageNodeMetadata{
StorageNodeMetadataDescriptor: snpb.StorageNodeMetadataDescriptor{
ClusterID: adm.cid,
StorageNode: snd.StorageNode,
},
CreateTime: snd.CreateTime,
}, nil
}

func (adm *Admin) listStorageNodes(ctx context.Context) ([]vmspb.StorageNodeMetadata, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()
return adm.statRepository.ListStorageNodes(), nil

md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list storage nodes: cluster metadata not fetched")
}

snms := make([]vmspb.StorageNodeMetadata, 0, len(md.StorageNodes))
snmsMap := adm.statRepository.ListStorageNodes()
for _, snd := range md.StorageNodes {
if snm, ok := snmsMap[snd.StorageNodeID]; ok {
snms = append(snms, *snm)
continue
}
snms = append(snms, vmspb.StorageNodeMetadata{
StorageNodeMetadataDescriptor: snpb.StorageNodeMetadataDescriptor{
ClusterID: adm.cid,
StorageNode: snd.StorageNode,
},
CreateTime: snd.CreateTime,
})
}
sort.Slice(snms, func(i, j int) bool {
return snms[i].StorageNode.StorageNodeID < snms[j].StorageNode.StorageNodeID
})
return snms, nil
}

// addStorageNode adds a new storage node to the cluster.
Expand Down
33 changes: 29 additions & 4 deletions internal/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,22 @@ func TestAdmin_GetStorageNode(t *testing.T) {
mock.MockClusterMetadataView.EXPECT().ClusterMetadata(gomock.Any()).Return(
&varlogpb.MetadataDescriptor{}, nil,
).AnyTimes()
mock.MockRepository.EXPECT().GetStorageNode(snid).Return(nil, false)
},
},
{
name: "Success",
success: true,
prepare: func(mock *testMock) {
mock.MockClusterMetadataView.EXPECT().ClusterMetadata(gomock.Any()).Return(
&varlogpb.MetadataDescriptor{}, nil,
&varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
},
},
},
}, nil,
).AnyTimes()
mock.MockRepository.EXPECT().GetStorageNode(snid).Return(&vmspb.StorageNodeMetadata{
StorageNodeMetadataDescriptor: snpb.StorageNodeMetadataDescriptor{
Expand All @@ -160,6 +167,24 @@ func TestAdmin_GetStorageNode(t *testing.T) {
}, true)
},
},
{
name: "Success without exact status",
success: true,
prepare: func(mock *testMock) {
mock.MockClusterMetadataView.EXPECT().ClusterMetadata(gomock.Any()).Return(
&varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
},
},
},
}, nil,
).AnyTimes()
mock.MockRepository.EXPECT().GetStorageNode(snid).Return(nil, false)
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -350,8 +375,8 @@ func TestAdmin_ListStorageNodes(t *testing.T) {
}, nil,
).AnyTimes()
mock.MockRepository.EXPECT().ListStorageNodes().Return(
[]vmspb.StorageNodeMetadata{
{
map[types.StorageNodeID]*vmspb.StorageNodeMetadata{
snid: {
StorageNodeMetadataDescriptor: snpb.StorageNodeMetadataDescriptor{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
Expand Down
14 changes: 5 additions & 9 deletions internal/admin/stats/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package stats

import (
"context"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -43,7 +42,7 @@ type Repository interface {

// ListStorageNodes returns a map that maps storage node ID to the
// metadata for each storage node.
ListStorageNodes() []vmspb.StorageNodeMetadata
ListStorageNodes() map[types.StorageNodeID]*vmspb.StorageNodeMetadata

// RemoveStorageNode removes the metadata for the storage node
// specified by the snid.
Expand Down Expand Up @@ -130,17 +129,14 @@ func (s *repository) GetStorageNode(snid types.StorageNodeID) (*vmspb.StorageNod
return proto.Clone(snm).(*vmspb.StorageNodeMetadata), true
}

func (s *repository) ListStorageNodes() []vmspb.StorageNodeMetadata {
func (s *repository) ListStorageNodes() map[types.StorageNodeID]*vmspb.StorageNodeMetadata {
s.mu.RLock()
defer s.mu.RUnlock()
snms := make([]vmspb.StorageNodeMetadata, 0, len(s.storageNodes))
snms := make(map[types.StorageNodeID]*vmspb.StorageNodeMetadata, len(s.storageNodes))
for _, snm := range s.storageNodes {
copied := *proto.Clone(snm).(*vmspb.StorageNodeMetadata)
snms = append(snms, copied)
copied := proto.Clone(snm).(*vmspb.StorageNodeMetadata)
snms[snm.StorageNodeID] = copied
}
sort.Slice(snms, func(i, j int) bool {
return snms[i].StorageNode.StorageNodeID < snms[j].StorageNode.StorageNodeID
})
return snms
}

Expand Down
4 changes: 2 additions & 2 deletions internal/admin/stats/repository_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions pkg/varlog/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,30 @@ type Admin interface {
// TODO (jun): Specify types of errors, for instance, retriable, bad request, server's internal error.

// GetStorageNode returns the metadata of the storage node specified by the argument snid.
// It returns the ErrNotExist error if the storage node does not exist.
// If the admin server does not check the heartbeat of the storage node
// yet, some fields are zero values, for instance,
// LastHeartbeatTime, and Storages, LogStreamReplicas, Status, and
// StartTime of StorageNodeMetadataDescriptor.
// It returns the ErrNotExist if the storage node does not exist.
// It returns the ErrUnavailable if the cluster metadata cannot be fetched from the metadata repository.
GetStorageNode(ctx context.Context, snid types.StorageNodeID) (*vmspb.StorageNodeMetadata, error)
// ListStorageNodes returns a list of storage node metadata.
// If the admin server does not check the heartbeat of the storage node
// yet, some fields are zero values, for instance,
// LastHeartbeatTime, and Storages, LogStreamReplicas, Status, and
// StartTime of StorageNodeMetadataDescriptor.
// It returns the ErrUnavailable if the cluster metadata cannot be fetched from the metadata repository.
//
// Note that it should return an empty slice rather than nil to encode
// to an empty array in JSON if no storage node exists in the cluster.
ListStorageNodes(ctx context.Context) ([]vmspb.StorageNodeMetadata, error)
// GetStorageNodes returns a map of StorageNodeIDs and their addresses.
// If the admin server does not check the heartbeat of the storage node
// yet, some fields are zero values, for instance,
// LastHeartbeatTime, and Storages, LogStreamReplicas, Status, and
// StartTime of StorageNodeMetadataDescriptor.
// It returns the ErrUnavailable if the cluster metadata cannot be fetched from the metadata repository.
//
// Deprecated: Use ListStorageNodes.
GetStorageNodes(ctx context.Context) (map[types.StorageNodeID]vmspb.StorageNodeMetadata, error)
// AddStorageNode registers a storage node, whose ID and address are
Expand Down Expand Up @@ -172,8 +188,11 @@ func (c *admin) GetStorageNode(ctx context.Context, snid types.StorageNodeID) (*
StorageNodeID: snid,
})
if err != nil {
if st := status.Convert(err); st.Code() == codes.NotFound {
code := status.Convert(err).Code()
if code == codes.NotFound {
err = verrors.ErrNotExist
} else if code == codes.Unavailable {
err = verrors.ErrUnavailable
}
return nil, errors.WithMessage(err, "admin: get storage node")
}
Expand All @@ -183,7 +202,13 @@ func (c *admin) GetStorageNode(ctx context.Context, snid types.StorageNodeID) (*
func (c *admin) ListStorageNodes(ctx context.Context) ([]vmspb.StorageNodeMetadata, error) {
rsp, err := c.rpcClient.ListStorageNodes(ctx, &vmspb.ListStorageNodesRequest{})
if err != nil {
return nil, errors.WithMessage(err, "admin: list storage nodes") //verrors.FromStatusError(err)
code := status.Convert(err).Code()
if code == codes.NotFound {
err = verrors.ErrNotExist
} else if code == codes.Unavailable {
err = verrors.ErrUnavailable
}
return nil, errors.WithMessage(err, "admin: list storage nodes")
}

if len(rsp.StorageNodes) > 0 {
Expand Down
11 changes: 6 additions & 5 deletions pkg/verrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ var (
)

var (
ErrInvalid = errors.New("invalid argument")
ErrExist = errors.New("already exists")
ErrNotExist = errors.New("not exist")
ErrState = errors.New("invalid state")
ErrClosed = errors.New("closed")
ErrInvalid = errors.New("invalid argument")
ErrExist = errors.New("already exists")
ErrNotExist = errors.New("not exist")
ErrUnavailable = errors.New("unavailable")
ErrState = errors.New("invalid state")
ErrClosed = errors.New("closed")

ErrIgnore = errors.New("ignore")
ErrInprogress = errors.New("inprogress")
Expand Down
12 changes: 10 additions & 2 deletions proto/vmspb/admin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions proto/vmspb/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ option (gogoproto.goproto_unkeyed_all) = false;
option (gogoproto.goproto_unrecognized_all) = false;
option (gogoproto.goproto_sizecache_all) = false;

// StorageNodeMetadata represents the current status of the storage node.
message StorageNodeMetadata {
option (gogoproto.equal) = true;

Expand All @@ -26,11 +27,18 @@ message StorageNodeMetadata {
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// CreateTime is the time when the storage node is created and registered to
// the metadata repository.
google.protobuf.Timestamp create_time = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "createTime"
];
// LastHeartbeatTime is the time when the admin server checked the liveness of
// the storage node. A zero value indicates that the admin server does not
// check the storage node. Typically, since the storage node is just
// registered, the admin server does not know the field. It is also possible
// that the admin server is just restarted.
google.protobuf.Timestamp last_heartbeat_time = 3 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
Expand Down
61 changes: 61 additions & 0 deletions tests/it/failover/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/kakao/varlog/internal/admin"
"github.com/kakao/varlog/internal/admin/snwatcher"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/testutil"
"github.com/kakao/varlog/proto/varlogpb"
Expand Down Expand Up @@ -775,3 +777,62 @@ func TestVarlogFailoverUpdateLS(t *testing.T) {
})
}))
}

func TestAdmin_ListStorageNodes(t *testing.T) {
const numStorageNodes = 1
const tick = 100 * time.Millisecond

clus := it.NewVarlogCluster(t,
it.WithNumberOfStorageNodes(numStorageNodes),
it.WithVMSOptions(
admin.WithStorageNodeWatcherOptions(
snwatcher.WithTick(tick),
),
),
)
defer clus.Close(t)

old, err := clus.GetVMSClient(t).ListStorageNodes(context.Background())
require.NoError(t, err)
require.Len(t, old, 1)

clus.RestartVMS(t)

require.Eventually(t, func() bool {
snms, err := clus.GetVMSClient(t).ListStorageNodes(context.Background())
if !assert.NoError(t, err) || !assert.Len(t, snms, len(old)) {
return false
}
return snms[0].LastHeartbeatTime.After(old[0].LastHeartbeatTime)
}, 10*tick, tick)
}

func TestAdmin_GetStorageNode(t *testing.T) {
const numStorageNodes = 1
const tick = 100 * time.Millisecond

clus := it.NewVarlogCluster(t,
it.WithNumberOfStorageNodes(numStorageNodes),
it.WithVMSOptions(
admin.WithStorageNodeWatcherOptions(
snwatcher.WithTick(tick),
),
),
)
defer clus.Close(t)

snid := clus.StorageNodeIDAtIndex(t, 0)

old, err := clus.GetVMSClient(t).GetStorageNode(context.Background(), snid)
require.NoError(t, err)

clus.RestartVMS(t)

require.Eventually(t, func() bool {
snm, err := clus.GetVMSClient(t).GetStorageNode(context.Background(), snid)
if !assert.NoError(t, err) {
return false
}
return snm.LastHeartbeatTime.After(old.LastHeartbeatTime)
}, 10*tick, tick)
}
Loading

0 comments on commit 28c02ab

Please sign in to comment.