Skip to content

Commit

Permalink
fix(storagenode): accept SyncInit sent from trimmed source to new des…
Browse files Browse the repository at this point in the history
…tination

Storage nodes can be trimmed and synchronized. However, there is some bug in that a new destination
replica joined into the log stream rejects SyncInit RPC sent from the trimmed source replica. Those
replicas are all empty and have no log entries; however, the source replica has a commit context
indicating the last committed LLSN. In this situation, the destination replica must accept SyncInit
to receive the commit context from the source replica, but it does not.

This PR fixes the above issue. To solve the problem, it changes the condition that the destination
replica decides whether they are already synchronized.

```go
    // Previous code: https://github.com/kakao/varlog/blob/5269481c0e80c2eebf8214116a2d1544a26cb443/internal/storagenode/logstream/sync.go#L297-L302
    //
    // NOTE: When the replica has all log entries, it returns its range of logs and non-error results.
    // In this case, this replica remains executorStateSealing.
    // Breaking change: previously it returns ErrExist when the replica has all log entries to replicate.
    if dstLastCommittedLLSN == srcRange.LastLLSN && !invalid {
        return snpb.SyncRange{}, status.Errorf(codes.AlreadyExists, "already synchronized")
    }
```

Since both replicas have no log entries, the condition `dstLastCommittedLLSN == srcRange.LastLLSN`
is not enough. This PR changed the condition to be `dstLastCommittedLLSN == srcLastCommittedLLSN &&
dstLastCommittedLLSN == srcRange.LastLLSN`. Since the `srcLastCommittedLLSN` is valid regardless of
log entries in the source replica, the destination replica will accept the SyncInit.

Resolve #478
  • Loading branch information
ijsong committed Jun 13, 2023
1 parent 7898697 commit ecb0a2c
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 48 deletions.
43 changes: 43 additions & 0 deletions internal/storagenode/logstream/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2697,6 +2697,49 @@ func TestExecutorSyncInit(t *testing.T) {
require.Equal(t, executorStateLearning, dst.esm.load())
},
},
{
// dst logs : no logs
// dst last llsn: no commit context
// src logs : no logs
// src last llsn: 10
// response : [first, last] = [0, 0]
//
name: "TrimmedSource_EmptyDestination_Synchronize",
pref: func(t *testing.T, dst *Executor) {
for i := 1; i <= dstLastLSN; i++ {
storage.TestDeleteLogEntry(t, dst.stg, varlogpb.LogSequenceNumber{
LLSN: types.LLSN(i),
GLSN: types.GLSN(i),
})
storage.TestDeleteCommitContext(t, dst.stg)
}
},
testf: func(t *testing.T, dst *Executor, src varlogpb.LogStreamReplica) {
syncRange, err := dst.SyncInit(context.Background(), src, snpb.SyncRange{}, 10)
require.NoError(t, err)
require.Equal(t, snpb.SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.InvalidLLSN}, syncRange)

rpt, err := dst.Report(context.Background())
require.NoError(t, err)
require.Equal(t, snpb.LogStreamUncommitReport{
LogStreamID: lsid,
UncommittedLLSNOffset: 0,
UncommittedLLSNLength: 0,
Version: 0,
HighWatermark: 0,
}, rpt)

lsrmd, err := dst.Metadata()
require.NoError(t, err)
require.Equal(t, varlogpb.LogStreamStatusSealing, lsrmd.Status)
require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalLowWatermark)
require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 0, GLSN: 0}, lsrmd.LocalHighWatermark)
require.Equal(t, types.InvalidVersion, lsrmd.Version)
require.Equal(t, types.InvalidGLSN, lsrmd.GlobalHighWatermark)

require.Equal(t, executorStateLearning, dst.esm.load())
},
},
{
// dst logs : 1, 2, ......, 10
// dst last llsn: 10
Expand Down
93 changes: 48 additions & 45 deletions internal/storagenode/logstream/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,24 @@ func (lse *Executor) Sync(ctx context.Context, dstReplica varlogpb.LogStreamRepl
return nil, err
}

// If the FirstLLSN of the sync range is greater than the LastLLSN of
// it, the destination has all log entries but commit context.
var first varlogpb.LogEntry
if !syncRange.FirstLLSN.Invalid() && syncRange.FirstLLSN <= syncRange.LastLLSN {
first, err = lse.stg.Read(storage.AtLLSN(syncRange.FirstLLSN))
var first, last varlogpb.LogSequenceNumber
if !syncRange.FirstLLSN.Invalid() {
logEntry, err := lse.stg.Read(storage.AtLLSN(syncRange.FirstLLSN))
if err != nil {
return nil, err
}
first = varlogpb.LogSequenceNumber{LLSN: logEntry.LLSN, GLSN: logEntry.GLSN}
}
if !syncRange.LastLLSN.Invalid() {
logEntry, err := lse.stg.Read(storage.AtLLSN(syncRange.LastLLSN))
if err != nil {
return nil, err
}
last = varlogpb.LogSequenceNumber{LLSN: logEntry.LLSN, GLSN: logEntry.GLSN}
}

// make tracker
st := newSyncTracker(varlogpb.LogSequenceNumber{
LLSN: first.LogEntryMeta.LLSN,
GLSN: first.LogEntryMeta.GLSN,
}, localHWM)
st := newSyncTracker(first, last)
lse.sts[dstReplica.StorageNodeID] = st
_, _ = lse.syncRunner.Run(func(ctx context.Context) {
snid := sc.dstReplica.StorageNodeID
Expand Down Expand Up @@ -229,14 +232,18 @@ func (lse *Executor) syncLoop(_ context.Context, sc *syncClient, st *syncTracker
err = fmt.Errorf("commit context: %w", err)
return
}
if cc.CommittedLLSNBegin+types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin)-1 != st.syncRange.last.LLSN {
err = fmt.Errorf("commit context: invalid LLSN: %+v", cc)
}
if cc.CommittedGLSNEnd-1 != st.syncRange.last.GLSN {
err = fmt.Errorf("commit context: invalid GLSN: %+v", cc)
}
if err != nil {
return
if !st.syncRange.first.LLSN.Invalid() {
// If the first LLSN of the sync range is invalid, there isn't a log entry to copy.
lastCommittedLLSN := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin) - 1
if lastCommittedLLSN != st.syncRange.last.LLSN {
err = fmt.Errorf("commit context: invalid LLSN: %+v", cc)
}
if cc.CommittedGLSNEnd-1 != st.syncRange.last.GLSN {
err = fmt.Errorf("commit context: invalid GLSN: %+v", cc)
}
if err != nil {
return
}
}
req.Payload.LogEntry = nil
req.Payload.CommitContext = &varlogpb.CommitContext{
Expand Down Expand Up @@ -297,7 +304,7 @@ func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamRe
// NOTE: When the replica has all log entries, it returns its range of logs and non-error results.
// In this case, this replica remains executorStateSealing.
// Breaking change: previously it returns ErrExist when the replica has all log entries to replicate.
if dstLastCommittedLLSN == srcRange.LastLLSN && !invalid {
if dstLastCommittedLLSN == srcLastCommittedLLSN && dstLastCommittedLLSN == srcRange.LastLLSN && !invalid {
return snpb.SyncRange{}, status.Errorf(codes.AlreadyExists, "already synchronized")
}

Expand Down Expand Up @@ -330,7 +337,7 @@ func (lse *Executor) SyncInit(_ context.Context, srcReplica varlogpb.LogStreamRe
trimGLSN = types.MaxGLSN
lwm = varlogpb.LogSequenceNumber{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN}
uncommittedBegin = varlogpb.LogSequenceNumber{
LLSN: srcLastCommittedLLSN,
LLSN: srcLastCommittedLLSN + 1,
GLSN: types.InvalidGLSN, // It is set to InvalidGLSN since it cannot be known.
}

Expand Down Expand Up @@ -488,37 +495,32 @@ func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStr
}
}()

var lem *varlogpb.LogEntryMeta
ver, hwm, uncommittedBegin, invalid := lse.lsc.reportCommitBase()
uncommittedLLSNBegin := uncommittedBegin.LLSN
uncommittedGLSNBegin := uncommittedBegin.GLSN

if entry := payload.LogEntry; entry != nil {
if entry.LLSN != uncommittedLLSNBegin {
err = fmt.Errorf("log stream: sync replicate: unexpected log entry: expected_llsn=%v, actual_llsn=%v", uncommittedLLSNBegin, entry.LLSN)
logEntry := payload.LogEntry
if logEntry != nil {
if logEntry.LLSN != uncommittedBegin.LLSN {
err = fmt.Errorf("log stream: sync replicate: unexpected log entry: expected_llsn=%v, actual_llsn=%v", uncommittedBegin.LLSN, logEntry.LLSN)
return err
}

err = batch.SetLogEntry(entry.LLSN, entry.GLSN, entry.Data)
err = batch.SetLogEntry(logEntry.LLSN, logEntry.GLSN, logEntry.Data)
if err != nil {
return err
}
if ce := lse.logger.Check(zap.DebugLevel, "log stream: sync replicate: copy"); ce != nil {
ce.Write(zap.String("log entry", entry.String()))
ce.Write(zap.String("log entry", logEntry.String()))
}
uncommittedLLSNBegin = entry.LLSN + 1
uncommittedGLSNBegin = entry.GLSN + 1
lem = &varlogpb.LogEntryMeta{
TopicID: lse.tpid,
LogStreamID: lse.lsid,
LLSN: entry.LLSN,
GLSN: entry.GLSN,

uncommittedBegin = varlogpb.LogSequenceNumber{
LLSN: logEntry.LLSN + 1,
GLSN: logEntry.GLSN + 1, // It might not be exact.
}
invalid = true
}
if cc := payload.CommitContext; cc != nil {
lastLLSN := cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin) - 1
if lastLLSN != uncommittedLLSNBegin-1 {
err = fmt.Errorf("log stream: sync replicate: unexpected commit context: expected_last_llsn=%v, actual_last_llsn=%v", uncommittedLLSNBegin-1, lastLLSN)
if lastLLSN != uncommittedBegin.LLSN-1 {
err = fmt.Errorf("log stream: sync replicate: unexpected commit context: expected_last_llsn=%v, actual_last_llsn=%v", uncommittedBegin.LLSN-1, lastLLSN)
return err
}

Expand All @@ -538,6 +540,10 @@ func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStr

ver = cc.Version
hwm = cc.HighWatermark
uncommittedBegin = varlogpb.LogSequenceNumber{
LLSN: cc.CommittedLLSNBegin + types.LLSN(cc.CommittedGLSNEnd-cc.CommittedGLSNBegin),
GLSN: cc.CommittedGLSNEnd,
}
invalid = false
done = true
}
Expand All @@ -546,17 +552,14 @@ func (lse *Executor) SyncReplicate(_ context.Context, srcReplica varlogpb.LogStr
return err
}

if lem != nil {
if logEntry != nil {
lse.lsc.localLWM.CompareAndSwap(varlogpb.LogSequenceNumber{}, varlogpb.LogSequenceNumber{
LLSN: lem.LLSN,
GLSN: lem.GLSN,
LLSN: logEntry.LLSN,
GLSN: logEntry.GLSN,
})
}
lse.lsc.storeReportCommitBase(ver, hwm, varlogpb.LogSequenceNumber{
LLSN: uncommittedLLSNBegin,
GLSN: uncommittedGLSNBegin,
}, invalid)
lse.lsc.uncommittedLLSNEnd.Store(uncommittedLLSNBegin)
lse.lsc.storeReportCommitBase(ver, hwm, uncommittedBegin, invalid)
lse.lsc.uncommittedLLSNEnd.Store(uncommittedBegin.LLSN)
lse.dstSyncInfo.lastSyncTime = time.Now()
return nil
}
12 changes: 12 additions & 0 deletions internal/storagenode/logstream/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

"github.com/kakao/varlog/internal/storage"
"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/snpb/mock"
"github.com/kakao/varlog/proto/varlogpb"
)

type testReplicateServer struct {
Expand Down Expand Up @@ -91,3 +93,13 @@ func TestGetStorage(t *testing.T, lse *Executor) *storage.Storage {
require.NotNil(t, lse.stg)
return lse.stg
}

func TestGetReportCommitBase(t *testing.T, lse *Executor) (commitVersion types.Version, highWatermark types.GLSN, uncommittedBegin varlogpb.LogSequenceNumber, invalid bool) {
require.NotNil(t, lse)
return lse.lsc.reportCommitBase()
}

func TestGetUncommittedLLSNEnd(t *testing.T, lse *Executor) types.LLSN {
require.NotNil(t, lse)
return lse.lsc.uncommittedLLSNEnd.Load()
}
3 changes: 2 additions & 1 deletion internal/storagenode/replication_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func (rs *replicationServer) SyncReplicateStream(stream snpb.Replicator_SyncRepl
if err != nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("replication server: sync replicate stream: %w", err)
}
err = fmt.Errorf("replication server: sync replicate stream: %w", err)
break
}

Expand Down
61 changes: 59 additions & 2 deletions internal/storagenode/storagenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2182,6 +2182,63 @@ func TestStorageNode_Sync(t *testing.T) {
}, snmd.LogStreamReplicas[0].LocalHighWatermark)
},
},
{
// ver: +-1-+
// src: <commit context = 2>
// dst: <empty>
name: "TrimmedSourceEmptyDestination",
testf: func(t *testing.T, src, dst *StorageNode) {
const ver = types.Version(1)
lastCommittedGLSN := lastGLSN(ver)

put(t, src, ver)
trim(t, src, lastCommittedGLSN)
lss, localHWM := TestSealLogStreamReplica(t, cid, src.snid, tpid, lsid, lastCommittedGLSN, src.advertise)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
// NOTE: localHWM in src is 0 since src was trimmed.
require.True(t, localHWM.Invalid())

lss, localHWM = TestSealLogStreamReplica(t, cid, dst.snid, tpid, lsid, lastCommittedGLSN, dst.advertise)
require.Equal(t, varlogpb.LogStreamStatusSealing, lss)
require.Equal(t, types.InvalidGLSN, localHWM)

syncStatus := TestSync(t, cid, src.snid, tpid, lsid, 0 /*unused*/, src.advertise, varlogpb.StorageNode{
StorageNodeID: dst.snid,
Address: dst.advertise,
})
require.Equal(t, snpb.SyncStateStart, syncStatus.State)

require.Eventually(t, func() bool {
syncStatus := TestSync(t, cid, src.snid, tpid, lsid, 0 /*unused*/, src.advertise, varlogpb.StorageNode{
StorageNodeID: dst.snid,
Address: dst.advertise,
})
return syncStatus.State == snpb.SyncStateComplete
}, 10*time.Second, 100*time.Millisecond)

lss, localHWM = TestSealLogStreamReplica(t, cid, dst.snid, tpid, lsid, lastCommittedGLSN, dst.advertise)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
// NOTE: localHWM in dst is 0 since dst has no log entries.
require.True(t, localHWM.Invalid())

// Check those replicas are ready to accept Append.
for _, sn := range []*StorageNode{src, dst} {
TestUnsealLogStreamReplica(t, cid, sn.snid, tpid, lsid, makeReplicas(sn), sn.advertise)

lse, ok := src.executors.Load(tpid, lsid)
require.True(t, ok)

version, hwm, uncommittedBegin, invalid := logstream.TestGetReportCommitBase(t, lse)
require.Equal(t, ver, version)
require.Equal(t, lastCommittedGLSN, hwm)
require.Equal(t, varlogpb.LogSequenceNumber{LLSN: 3, GLSN: 3}, uncommittedBegin)
require.False(t, invalid)

uncommittedLLSNEnd := logstream.TestGetUncommittedLLSNEnd(t, lse)
require.Equal(t, types.LLSN(lastCommittedGLSN+1), uncommittedLLSNEnd)
}
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -2217,8 +2274,8 @@ func TestStorageNode_Sync(t *testing.T) {

TestAddLogStreamReplica(t, cid, sn.snid, tpid, lsid, sn.snPaths[0], sn.advertise)

status, localHWM := TestSealLogStreamReplica(t, cid, sn.snid, tpid, lsid, types.InvalidGLSN, sn.advertise)
require.Equal(t, varlogpb.LogStreamStatusSealed, status)
lss, localHWM := TestSealLogStreamReplica(t, cid, sn.snid, tpid, lsid, types.InvalidGLSN, sn.advertise)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.Equal(t, types.InvalidGLSN, localHWM)

TestUnsealLogStreamReplica(t, cid, sn.snid, tpid, lsid, makeReplicas(sn), sn.advertise)
Expand Down

0 comments on commit ecb0a2c

Please sign in to comment.