Skip to content

Commit

Permalink
Merge pull request #261 from ijsong/remove-head-tail-from-logstreamde…
Browse files Browse the repository at this point in the history
…scriptor

feat(client,storagenode): remove Head and Tail from proto/varlogpb.(LogStreamDescriptor)
  • Loading branch information
ijsong authored Dec 9, 2022
2 parents 50b5b6d + 57161c8 commit 6f36b0d
Show file tree
Hide file tree
Showing 19 changed files with 165 additions and 652 deletions.
11 changes: 0 additions & 11 deletions internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,6 @@ func (c *LogClient) TrimDeprecated(ctx context.Context, tpid types.TopicID, glsn
return nil
}

func (c *LogClient) LogStreamMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (varlogpb.LogStreamDescriptor, error) {
rsp, err := c.rpcClient.LogStreamMetadata(ctx, &snpb.LogStreamMetadataRequest{
TopicID: tpid,
LogStreamID: lsid,
})
if err != nil {
return rsp.GetLogStreamDescriptor(), fmt.Errorf("logclient: %w", verrors.FromStatusError(err))
}
return rsp.GetLogStreamDescriptor(), nil
}

func (c *LogClient) LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) {
rsp, err := c.rpcClient.LogStreamReplicaMetadata(ctx, &snpb.LogStreamReplicaMetadataRequest{
TopicID: tpid,
Expand Down
10 changes: 0 additions & 10 deletions internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,3 @@ func (ls logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStr
}
return &snpb.LogStreamReplicaMetadataResponse{LogStreamReplica: lsrmd}, nil
}

func (ls logServer) LogStreamMetadata(_ context.Context, req *snpb.LogStreamMetadataRequest) (*snpb.LogStreamMetadataResponse, error) {
lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID)
if !loaded {
return nil, errors.New("storage: no such logstream")
}

lsd, err := lse.LogStreamMetadata()
return &snpb.LogStreamMetadataResponse{LogStreamDescriptor: lsd}, err
}
40 changes: 0 additions & 40 deletions internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,46 +497,6 @@ func (lse *Executor) metadataDescriptor(state executorState) snpb.LogStreamRepli
}
}

func (lse *Executor) LogStreamMetadata() (lsd varlogpb.LogStreamDescriptor, err error) {
atomic.AddInt64(&lse.inflight, 1)
defer atomic.AddInt64(&lse.inflight, -1)

if lse.esm.load() == executorStateClosed {
return lsd, verrors.ErrClosed
}

var status varlogpb.LogStreamStatus
switch lse.esm.load() {
case executorStateAppendable:
status = varlogpb.LogStreamStatusRunning
case executorStateSealing, executorStateLearning:
status = varlogpb.LogStreamStatusSealing
case executorStateSealed:
status = varlogpb.LogStreamStatusSealed
}

localLWM := lse.lsc.localLowWatermark()
localHWM := lse.lsc.localHighWatermark()
lsd = varlogpb.LogStreamDescriptor{
TopicID: lse.tpid,
LogStreamID: lse.lsid,
Status: status,
Head: varlogpb.LogEntryMeta{
TopicID: lse.tpid,
LogStreamID: lse.lsid,
LLSN: localLWM.LLSN,
GLSN: localLWM.GLSN,
},
Tail: varlogpb.LogEntryMeta{
TopicID: lse.tpid,
LogStreamID: lse.lsid,
LLSN: localHWM.LLSN,
GLSN: localHWM.GLSN,
},
}
return lsd, nil
}

func (lse *Executor) Trim(_ context.Context, glsn types.GLSN) error {
atomic.AddInt64(&lse.inflight, 1)
defer atomic.AddInt64(&lse.inflight, -1)
Expand Down
3 changes: 0 additions & 3 deletions internal/storagenode/logstream/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ func TestExecutor_Closed(t *testing.T) {
_, err = lse.Metadata()
assert.ErrorIs(t, err, verrors.ErrClosed)

_, err = lse.LogStreamMetadata()
assert.ErrorIs(t, err, verrors.ErrClosed)

_, err = lse.SubscribeWithGLSN(types.MinGLSN, types.MinGLSN)
assert.ErrorIs(t, err, verrors.ErrClosed)

Expand Down
26 changes: 0 additions & 26 deletions pkg/varlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/runner"
"github.com/kakao/varlog/pkg/util/syncutil/atomicutil"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

Expand All @@ -37,23 +36,6 @@ type Log interface {

Trim(ctx context.Context, topicID types.TopicID, until types.GLSN, opts TrimOption) error

// LogStreamMetadata returns metadata of log stream specified by the
// arguments tpid and lsid.
// It returns the first metadata that is fetched successfully from all
// replicas in the log stream.
// It returns an error if the log stream does not exist or fails to
// fetch metadata from all replicas.
//
// Deprecated: Use PeekLogStream
LogStreamMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (varlogpb.LogStreamDescriptor, error)

// LogStreamReplicaMetadata returns metadata of log stream replica
// specified by the arguments tpid and lsid. It returns the first
// successful result among all replicas.
//
// Deprecated: Use PeekLogStream
LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error)

// PeekLogStream returns the log sequence numbers at the first and the
// last. It fetches the metadata for each replica of a log stream lsid
// concurrently and takes a result from either appendable or sealed
Expand Down Expand Up @@ -191,14 +173,6 @@ func (v *logImpl) Trim(ctx context.Context, topicID types.TopicID, until types.G
return v.trim(ctx, topicID, until, opts)
}

func (v *logImpl) LogStreamMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (varlogpb.LogStreamDescriptor, error) {
return v.logStreamMetadata(ctx, tpid, lsid)
}

func (v *logImpl) LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) {
return v.logStreamReplicaMetadata(ctx, tpid, lsid)
}

func (v *logImpl) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
return v.peekLogStream(ctx, tpid, lsid)
}
Expand Down
31 changes: 0 additions & 31 deletions pkg/varlog/log_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 0 additions & 51 deletions pkg/varlog/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"sync"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/kakao/varlog/pkg/types"
Expand Down Expand Up @@ -99,56 +98,6 @@ func (v *logImpl) appendTo(ctx context.Context, tpid types.TopicID, lsid types.L
return res, nil
}

func (v *logImpl) logStreamMetadata(ctx context.Context, tpID types.TopicID, lsID types.LogStreamID) (lsd varlogpb.LogStreamDescriptor, err error) {
replicas, ok := v.replicasRetriever.Retrieve(tpID, lsID)
if !ok {
return varlogpb.LogStreamDescriptor{}, errNoLogStream
}

for _, replica := range replicas {
cl, cerr := v.logCLManager.GetOrConnect(ctx, replica.StorageNodeID, replica.Address)
if cerr != nil {
err = multierr.Append(err, cerr)
continue
}
lsd, cerr = cl.LogStreamMetadata(ctx, tpID, lsID)
if cerr != nil {
err = multierr.Append(err, cerr)
continue
}
if lsd.Status.Deleted() {
err = multierr.Append(err, errors.Errorf("invalid status: %s", lsd.Status.String()))
continue
}
return lsd, nil
}
return lsd, err
}

func (v *logImpl) logStreamReplicaMetadata(ctx context.Context, tpID types.TopicID, lsID types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) {
replicas, ok := v.replicasRetriever.Retrieve(tpID, lsID)
if !ok {
return snpb.LogStreamReplicaMetadataDescriptor{}, errNoLogStream
}

var err error
for _, replica := range replicas {
cl, cerr := v.logCLManager.GetOrConnect(ctx, replica.StorageNodeID, replica.Address)
if cerr != nil {
err = multierr.Append(err, cerr)
continue
}

lsrmd, cerr := cl.LogStreamReplicaMetadata(ctx, tpID, lsID)
if cerr != nil {
err = multierr.Append(err, cerr)
continue
}
return lsrmd, nil
}
return snpb.LogStreamReplicaMetadataDescriptor{}, err
}

func (v *logImpl) peekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
replicas, ok := v.replicasRetriever.Retrieve(tpid, lsid)
if !ok {
Expand Down
88 changes: 0 additions & 88 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ package varlogtest
import (
"context"
"io"
"path/filepath"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"

"github.com/kakao/varlog/internal/storagenode/volume"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/varlog"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

Expand Down Expand Up @@ -273,90 +269,6 @@ func (c *testLog) Trim(ctx context.Context, topicID types.TopicID, until types.G
panic("not implemented")
}

func (c *testLog) LogStreamMetadata(_ context.Context, topicID types.TopicID, logStreamID types.LogStreamID) (varlogpb.LogStreamDescriptor, error) {
if err := c.lock(); err != nil {
return varlogpb.LogStreamDescriptor{}, err
}
defer c.unlock()

topicDesc, ok := c.vt.topics[topicID]
if !ok {
return varlogpb.LogStreamDescriptor{}, errors.New("no such topic")
}

if !topicDesc.HasLogStream(logStreamID) {
return varlogpb.LogStreamDescriptor{}, errors.New("no such log stream")
}

logStreamDesc, ok := c.vt.logStreams[logStreamID]
if !ok {
return varlogpb.LogStreamDescriptor{}, errors.New("no such log stream")
}

logStreamDesc = *proto.Clone(&logStreamDesc).(*varlogpb.LogStreamDescriptor)
head, tail := c.vt.peek(topicID, logStreamID)
logStreamDesc.Head = head //nolint:staticcheck
logStreamDesc.Tail = tail
return logStreamDesc, nil
}

func (c *testLog) LogStreamReplicaMetadata(_ context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) {
if err := c.lock(); err != nil {
return snpb.LogStreamReplicaMetadataDescriptor{}, err
}
defer c.unlock()

topicDesc, ok := c.vt.topics[tpid]
if !ok {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.New("no such topic")
}

if !topicDesc.HasLogStream(lsid) {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.New("no such log stream")
}

lsd, ok := c.vt.logStreams[lsid]
if !ok {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.New("no such log stream")
}

snid := lsd.Replicas[0].StorageNodeID
snmd, ok := c.vt.storageNodes[snid]
if !ok {
return snpb.LogStreamReplicaMetadataDescriptor{}, errors.New("no such storage node")
}

snpath := snmd.Storages[0].Path
dataPath := filepath.Join(snpath, volume.LogStreamDirName(tpid, lsid))

n := len(c.vt.globalLogEntries[tpid])
lastGLSN := c.vt.globalLogEntries[tpid][n-1].GLSN
head, tail := c.vt.peek(tpid, lsid)
return snpb.LogStreamReplicaMetadataDescriptor{
LogStreamReplica: varlogpb.LogStreamReplica{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
},
TopicLogStream: varlogpb.TopicLogStream{
TopicID: tpid,
LogStreamID: lsid,
},
},
Status: varlogpb.LogStreamStatusRunning,
Version: c.vt.version,
GlobalHighWatermark: lastGLSN,
LocalLowWatermark: varlogpb.LogSequenceNumber{
GLSN: head.GLSN,
LLSN: head.LLSN,
},
LocalHighWatermark: varlogpb.LogSequenceNumber{
GLSN: tail.GLSN,
LLSN: tail.LLSN,
},
Path: dataPath,
}, nil
}

func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
if err = c.lock(); err != nil {
return first, last, err
Expand Down
11 changes: 0 additions & 11 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,6 @@ func TestVarlogTest(t *testing.T) {
topicID := topicIDs[i]
logStreamID := addLogStream(topicID)

logStreamDesc, err := vlg.LogStreamMetadata(context.Background(), topicID, logStreamID) //nolint:staticcheck
require.NoError(t, err)
require.Equal(t, varlogpb.LogEntryMeta{
TopicID: topicID,
LogStreamID: logStreamID,
}, logStreamDesc.Head) //nolint:staticcheck
require.Equal(t, varlogpb.LogEntryMeta{
TopicID: topicID,
LogStreamID: logStreamID,
}, logStreamDesc.Tail)

first, last, err := vlg.PeekLogStream(context.Background(), topicID, logStreamID)
require.NoError(t, err)
require.True(t, first.Invalid())
Expand Down
Loading

0 comments on commit 6f36b0d

Please sign in to comment.