Skip to content

Commit

Permalink
feat(storagenode): change SyncInit to handle trimmed source replica
Browse files Browse the repository at this point in the history
This patch changes the behavior of SyncInit to handle empty source replicas. Previously SyncInit
thought that an empty source could not exist. Changes are the followings:

- Destination replica, SyncInit RPC server replies gRPC AlreadyExists error if it already has enough
  log entries and a commit context.
- Destination replica trims its local log entries if the source replica already did and receives a
  commit context if necessary.

Soon, we will change the behavior of Trim to be able to remove all log entries from a log stream
replica. This PR prepares it.

Updates #351
  • Loading branch information
ijsong committed Apr 5, 2023
1 parent 02106fa commit 79c5323
Show file tree
Hide file tree
Showing 8 changed files with 1,422 additions and 574 deletions.
805 changes: 669 additions & 136 deletions internal/storagenode/logstream/executor_test.go

Large diffs are not rendered by default.

229 changes: 153 additions & 76 deletions internal/storagenode/logstream/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/internal/storage"
"github.com/kakao/varlog/pkg/rpc"
Expand Down Expand Up @@ -80,6 +82,18 @@ func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamRepl
return st.toSyncStatus(), nil
}

localLWM, localHWM, _ := lse.lsc.localWatermarks()

// The committedLLSNEnd is types.MinLLSN if the result of ReadCommitContext
// is storage.ErrNoCommitContext, since it means this log stream has not
// received a commit message from the metadata repository yet.
committedLLSNEnd := types.MinLLSN
if cc, err := lse.stg.ReadCommitContext(); err == nil {
committedLLSNEnd = cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin)
} else if err != storage.ErrNoCommitContext {
return nil, status.Errorf(codes.Internal, "sync: %s", err.Error())
}

rpcConn, err := rpc.NewConn(ctx, dstReplica.Address)
if err != nil {
return nil, err
Expand All @@ -98,35 +112,33 @@ func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamRepl
}
}()

localLWM, localHWM, _ := lse.lsc.localWatermarks()
syncRange, err := sc.syncInit(ctx, snpb.SyncRange{
FirstLLSN: localLWM.LLSN,
LastLLSN: localHWM.LLSN,
})
}, committedLLSNEnd-1)
if err != nil {
if status.Code(err) == codes.AlreadyExists {
err = nil
_ = sc.close()
return &snpb.SyncStatus{
State: snpb.SyncStateComplete,
First: snpb.SyncPosition{
LLSN: localLWM.LLSN,
GLSN: localLWM.GLSN,
},
Last: snpb.SyncPosition{
LLSN: localHWM.LLSN,
GLSN: localHWM.GLSN,
},
}, nil
}
return nil, err
}
if syncRange.FirstLLSN.Invalid() && syncRange.LastLLSN.Invalid() {
// NOTE: The sync client should be closed to avoid leaks of the
// gRPC connection and goroutine.
_ = sc.close()
return &snpb.SyncStatus{
State: snpb.SyncStateComplete,
First: snpb.SyncPosition{
LLSN: localLWM.LLSN,
GLSN: localLWM.GLSN,
},
Last: snpb.SyncPosition{
LLSN: localHWM.LLSN,
GLSN: localHWM.GLSN,
},
}, nil
}

// If the FirstLLSN of the sync range is greater than the LastLLSN of
// it, the destination has all log entries but commit context.
var first varlogpb.LogEntry
if syncRange.FirstLLSN <= syncRange.LastLLSN {
if !syncRange.FirstLLSN.Invalid() && syncRange.FirstLLSN <= syncRange.LastLLSN {
first, err = lse.stg.Read(storage.AtLLSN(syncRange.FirstLLSN))
if err != nil {
return nil, err
Expand Down Expand Up @@ -238,46 +250,56 @@ func (lse *Executor) syncLoop(_ context.Context, sc *syncClient, st *syncTracker
err = stream.SendMsg(req)
}

func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamReplica, srcRange snpb.SyncRange) (syncRange snpb.SyncRange, err error) {
func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamReplica, srcRange snpb.SyncRange, srcLastCommittedLLSN types.LLSN) (syncRange snpb.SyncRange, err error) {
atomic.AddInt64(&lse.inflight, 1)
defer atomic.AddInt64(&lse.inflight, -1)

lse.muAdmin.Lock()
defer lse.muAdmin.Unlock()

err = srcRange.Validate()
if err != nil {
err = status.Error(codes.InvalidArgument, err.Error())
return
}

if !srcRange.LastLLSN.Invalid() && srcRange.LastLLSN != srcLastCommittedLLSN {
err = status.Errorf(codes.InvalidArgument, "unmatched llsn: the last of range %d, the last committed llsn %d", srcRange.LastLLSN, srcLastCommittedLLSN)
return
}

if state := lse.esm.load(); state != executorStateSealing {
if state == executorStateClosed {
err = fmt.Errorf("log stream: sync init: %w", verrors.ErrClosed)
return
}
if state != executorStateLearning || (time.Since(lse.dstSyncInfo.lastSyncTime) < lse.syncTimeout) {
if state != executorStateLearning {
err = fmt.Errorf("log stream: sync init: invalid state %d: %w", state, verrors.ErrInvalid)
return
}
if elapsed := time.Since(lse.dstSyncInfo.lastSyncTime); elapsed < lse.syncTimeout {
err = fmt.Errorf("log stream: sync init: learning in-progress %s", elapsed)
return
}
// syncTimeout is expired.
lse.esm.store(executorStateSealing)
}

if srcRange.Invalid() {
err = fmt.Errorf("log stream: sync init: invalid range %s: %w", srcRange.String(), verrors.ErrInvalid)
return
}

_, _, uncommittedBegin, invalid := lse.lsc.reportCommitBase()
uncommittedLLSNBegin, uncommittedGLSNBegin := uncommittedBegin.LLSN, uncommittedBegin.GLSN
lastCommittedLLSN := uncommittedLLSNBegin - 1
if lastCommittedLLSN > srcRange.LastLLSN {
uncommittedLLSNBegin, _ := uncommittedBegin.LLSN, uncommittedBegin.GLSN
dstLastCommittedLLSN := uncommittedLLSNBegin - 1
if !srcRange.LastLLSN.Invalid() && srcRange.LastLLSN < dstLastCommittedLLSN {
lse.logger.Panic("sync init: destination of sync has too many logs",
zap.String("src_range", srcRange.String()),
zap.Uint64("last_committed_llsn", uint64(lastCommittedLLSN)),
zap.Uint64("last_committed_llsn", uint64(dstLastCommittedLLSN)),
)
}

// NOTE: When the replica has all log entries, it returns its range of logs and non-error results.
// In this case, this replica remains executorStateSealing.
// Breaking change: previously it returns ErrExist when the replica has all log entries to replicate.
if lastCommittedLLSN == srcRange.LastLLSN && !invalid {
return snpb.SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.InvalidLLSN}, nil
if dstLastCommittedLLSN == srcRange.LastLLSN && !invalid {
return snpb.SyncRange{}, status.Errorf(codes.AlreadyExists, "already synchronized")
}

// The log stream replica will not send a report to the metadata
Expand All @@ -290,18 +312,68 @@ func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamRe

trimGLSN := types.InvalidGLSN
lwm, _, _ := lse.lsc.localWatermarks()
alreadySynchronized := false

switch {
case srcRange.FirstLLSN.Invalid() && srcRange.LastLLSN.Invalid():
if srcLastCommittedLLSN < dstLastCommittedLLSN {
lse.logger.Panic("sync init: destination of sync had too many logs",
zap.Any("src_last_committed_llsn", srcLastCommittedLLSN),
zap.Any("dst_last_committed_llsn", dstLastCommittedLLSN),
)
} else if srcLastCommittedLLSN == dstLastCommittedLLSN && !invalid {
alreadySynchronized = true
}

if lwm.LLSN < srcRange.FirstLLSN && srcRange.FirstLLSN <= lastCommittedLLSN {
// The source replica has already trimmed some prefix log
// entries; thus, the destination replica should remove prefix
// log entries to be the same as the source.

// TODO: There are two things to do to avoid finding log
// entries here:
// - The source replica should propose a range of
// synchronization denoted by GLSN and LLSN.
// - Methods related to trim in the storage and log stream
// should accept exclusive boundaries rather than inclusive.
// The source replica does not have log entries when both the FirstLLSN
// and LastLLSN of the srcRange are InvalidLLSNs. Therefore, the
// destination replica must remove all log entries.
trimGLSN = types.MaxGLSN
lwm = varlogpb.LogSequenceNumber{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN}
uncommittedBegin = varlogpb.LogSequenceNumber{
LLSN: srcLastCommittedLLSN,
GLSN: types.InvalidGLSN, // It is set to InvalidGLSN since it cannot be known.
}

syncRange = snpb.SyncRange{
FirstLLSN: types.InvalidLLSN,
LastLLSN: types.InvalidLLSN,
}

case srcRange.FirstLLSN < lwm.LLSN:
// The destination replica has a higher LowWatermark than the FirstLLSN
// of the SyncRange sent from the source replica, meaning the
// destination replica might have already been trimmed.
// To simplify the synchronization process, log entries in the
// destination replica will be cut, and then, the log entries will be
// copied from the source to the destination.
trimGLSN = types.MaxGLSN
lwm = varlogpb.LogSequenceNumber{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN}
uncommittedBegin = varlogpb.LogSequenceNumber{
LLSN: srcRange.FirstLLSN,
GLSN: types.InvalidGLSN, // It is set to InvalidGLSN since it cannot be known.
}

syncRange = snpb.SyncRange{
FirstLLSN: srcRange.FirstLLSN,
LastLLSN: srcRange.LastLLSN,
}

case srcRange.FirstLLSN == lwm.LLSN:
// The destination replica must not remove log entries; therefore, it
// does not need to change the local low watermark.
// no need to trim
trimGLSN = types.InvalidGLSN
syncRange = snpb.SyncRange{
FirstLLSN: dstLastCommittedLLSN + 1,
LastLLSN: srcRange.LastLLSN,
}

case lwm.LLSN < srcRange.FirstLLSN && srcRange.FirstLLSN <= dstLastCommittedLLSN:
// The destination replica has to trim log entries lower than the
// FirstLLSN of the SyncRange because the source replica did.
// The local low watermark of the destination replica also has to be
// updated.
var entry varlogpb.LogEntry
entry, err = lse.stg.Read(storage.AtLLSN(srcRange.FirstLLSN - 1))
if err != nil {
Expand All @@ -317,38 +389,41 @@ func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamRe
lse.esm.store(executorStateSealing)
return
}

// The local low watermark of the destination replica after
// trimming the prefix log entries should be the same as the
// first entry of the sync range.
lwm = varlogpb.LogSequenceNumber{
LLSN: entry.LLSN,
GLSN: entry.GLSN,
}
} else if srcRange.FirstLLSN < lwm.LLSN || lastCommittedLLSN < srcRange.FirstLLSN {
// The destination replica that has been trimmed prefix log
// entries as opposed to the source replica is unusual. In this
// case, the destination replica deletes all log entries to fix
// it. Similarly, if the source replica has already trimmed log
// entries that the destination has, the destination should
// delete all log entries.

syncRange = snpb.SyncRange{
FirstLLSN: dstLastCommittedLLSN + 1,
LastLLSN: srcRange.LastLLSN,
}

default: // dstLastCommittedLLSN < srcRange.FirstLLSN
// All log entries in the destination replica should be removed since
// the log entries in the source replica have already been removed. So
// the local low watermark in the destination replica has to be
// changed.
trimGLSN = types.MaxGLSN
lastCommittedLLSN = srcRange.FirstLLSN - 1
lwm = varlogpb.LogSequenceNumber{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN}
uncommittedBegin = varlogpb.LogSequenceNumber{
LLSN: srcRange.FirstLLSN,
GLSN: types.InvalidGLSN, // It is set to InvalidGLSN since it cannot be known.
}

// Since the destination replica will trim all log entries, the
// local low and high watermarks will be invalid.
// The destination replica should invalidate its local high
// watermark by setting the GLSN of uncommittedBegin to
// invalid.
uncommittedGLSNBegin = types.InvalidGLSN
// Setting an invalid log sequence number to the local low
// watermark is necessary to invalidate it.
lwm = varlogpb.LogSequenceNumber{}
syncRange = snpb.SyncRange{
FirstLLSN: srcRange.FirstLLSN,
LastLLSN: srcRange.LastLLSN,
}
}

syncRange = snpb.SyncRange{
FirstLLSN: lastCommittedLLSN + 1,
LastLLSN: srcRange.LastLLSN,
// It is unnecessary to copy log entries if FirstLLSN is greater than
// LastLLSN, and it has to copy only the commit context.
if syncRange.FirstLLSN > syncRange.LastLLSN {
syncRange = snpb.SyncRange{
FirstLLSN: types.InvalidLLSN,
LastLLSN: types.InvalidLLSN,
}
}

if !trimGLSN.Invalid() {
Expand All @@ -362,19 +437,21 @@ func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamRe
lse.lsc.setLocalLowWatermark(lwm)
}

if alreadySynchronized {
lse.esm.store(executorStateSealing)
return snpb.SyncRange{}, status.Errorf(codes.AlreadyExists, "already synchronized")
}

// NOTE: Invalid reportCommitBase makes the report of the log
// stream replica meaningless.
// The LLSN of uncommittedBegin indicates a sequence number of
// the following log entry copied from a source replica.
// Invalid GLSN of uncommittedBegin makes the local high
// watermark of the replica invalid.
lse.lsc.storeReportCommitBase(types.InvalidVersion, types.InvalidGLSN, varlogpb.LogSequenceNumber{
LLSN: lastCommittedLLSN + 1,
GLSN: uncommittedGLSNBegin,
}, true /*invalid*/)
// The LLSN of uncommittedBegin indicates a sequence number of the
// following log entry copied from a source replica.
// Invalid GLSN of uncommittedBegin makes the local high watermark of the
// replica invalid.
lse.lsc.storeReportCommitBase(types.InvalidVersion, types.InvalidGLSN, uncommittedBegin, true /*invalid*/)

// learning
lse.resetInternalState(lastCommittedLLSN, !lse.isPrimary())
lse.resetInternalState(dstLastCommittedLLSN, !lse.isPrimary())
lse.dstSyncInfo.lastSyncTime = time.Now()
lse.dstSyncInfo.srcReplica = srcReplica.StorageNodeID
return syncRange, nil
Expand Down
12 changes: 7 additions & 5 deletions internal/storagenode/logstream/sync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.uber.org/zap"

"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)
Expand All @@ -32,12 +33,13 @@ func newSyncClient(cfg syncClientConfig) *syncClient {
return sc
}

func (sc *syncClient) syncInit(ctx context.Context, srcRange snpb.SyncRange) (syncRange snpb.SyncRange, err error) {
func (sc *syncClient) syncInit(ctx context.Context, srcRange snpb.SyncRange, lastCommittedLLSN types.LLSN) (syncRange snpb.SyncRange, err error) {
rsp, err := sc.rpcClient.SyncInit(ctx, &snpb.SyncInitRequest{
ClusterID: sc.lse.cid,
Source: sc.srcReplica,
Destination: sc.dstReplica,
Range: srcRange,
ClusterID: sc.lse.cid,
Source: sc.srcReplica,
Destination: sc.dstReplica,
Range: srcRange,
LastCommittedLLSN: lastCommittedLLSN,
})
return rsp.GetRange(), err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storagenode/replication_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (rs *replicationServer) SyncInit(ctx context.Context, req *snpb.SyncInitReq
if !loaded {
return nil, fmt.Errorf("replication server: no log stream %v", req.Destination.LogStreamID)
}
syncRange, err := lse.SyncInit(ctx, req.Source, req.Range)
syncRange, err := lse.SyncInit(ctx, req.Source, req.Range, req.LastCommittedLLSN)
return &snpb.SyncInitResponse{Range: syncRange}, err
}

Expand Down
Loading

0 comments on commit 79c5323

Please sign in to comment.