Skip to content

Commit

Permalink
fix(storagenode): support partial success/failure for append
Browse files Browse the repository at this point in the history
The Append RPC function allows batch logging but may occasionally only partially succeed. The
storage node previously overlooked partial success, but this PR fixes to consider partial success.
  • Loading branch information
ijsong committed Jun 1, 2023
1 parent ae21296 commit 9c0f857
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 9 deletions.
28 changes: 19 additions & 9 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"go.uber.org/zap"

"github.com/kakao/varlog/internal/batchlet"
snerrors "github.com/kakao/varlog/internal/storagenode/errors"
"github.com/kakao/varlog/pkg/verrors"
Expand Down Expand Up @@ -155,16 +157,24 @@ func (lse *Executor) waitForCompletionOfAppends(ctx context.Context, dataBatchLe
result := make([]snpb.AppendResult, dataBatchLen)
for i := range awgs {
cerr := awgs[i].wait(ctx)
if err == nil && cerr != nil {
err = cerr
if cerr != nil {
result[i].Error = cerr.Error()
if err == nil {
err = cerr
}
continue
}
if cerr == nil {
result[i].Meta.TopicID = lse.tpid
result[i].Meta.LogStreamID = lse.lsid
result[i].Meta.GLSN = awgs[i].glsn
result[i].Meta.LLSN = awgs[i].llsn
awgs[i].release()
if err != nil {
lse.logger.Panic("Results of batch requests of Append RPC must not be interleaved with success and failure", zap.Error(err))
}
result[i].Meta.TopicID = lse.tpid
result[i].Meta.LogStreamID = lse.lsid
result[i].Meta.GLSN = awgs[i].glsn
result[i].Meta.LLSN = awgs[i].llsn
awgs[i].release()
}
if result[0].Meta.GLSN.Invalid() {
return nil, err
}
return result, err
return result, nil
}
61 changes: 61 additions & 0 deletions internal/storagenode/storagenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,67 @@ func TestStorageNode_Append(t *testing.T) {
}()
},
},
{
name: "AppendBatch",
testf: func(t *testing.T, addr string, lc *client.LogClient) {
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.True(t, lastGLSN.Invalid())

TestUnsealLogStreamReplica(t, cid, snid, tpid, lsid, []varlogpb.LogStreamReplica{
{
StorageNode: varlogpb.StorageNode{
StorageNodeID: snid,
Address: addr,
},
TopicLogStream: varlogpb.TopicLogStream{
TopicID: tpid,
LogStreamID: lsid,
},
},
}, addr)

batch := [][]byte{[]byte("msg1"), []byte("msg2"), []byte("msg3")}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
res, err := lc.Append(context.Background(), tpid, lsid, batch)
require.NoError(t, err)
require.Len(t, res, len(batch))
require.Empty(t, res[0].Error)
require.False(t, res[0].Meta.GLSN.Invalid())
require.NotEmpty(t, res[1].Error)
require.True(t, res[1].Meta.GLSN.Invalid())
require.NotEmpty(t, res[2].Error)
require.True(t, res[2].Meta.GLSN.Invalid())
}()

require.Eventually(t, func() bool {
reportcommitter.TestCommit(t, addr, snpb.CommitRequest{
StorageNodeID: snid,
CommitResult: snpb.LogStreamCommitResult{
TopicID: tpid,
LogStreamID: lsid,
CommittedLLSNOffset: 1,
CommittedGLSNOffset: 1,
CommittedGLSNLength: 1,
Version: 1,
HighWatermark: 1,
},
})
reports := reportcommitter.TestGetReport(t, addr)
require.Len(t, reports, 1)
return reports[0].Version == types.Version(1)
}, time.Second, 10*time.Millisecond)

lss, lastGLSN = TestSealLogStreamReplica(t, cid, snid, tpid, lsid, 1, addr)
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
require.Equal(t, types.GLSN(1), lastGLSN)

wg.Wait()
},
},
}

for _, tc := range tcs {
Expand Down

0 comments on commit 9c0f857

Please sign in to comment.