Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): support goroutine-safety of pkg/var log.(LogStream Appender).Append Batch #472

Merged
merged 1 commit into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/varlog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/kakao/varlog/internal/storagenode"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)
Expand Down Expand Up @@ -97,8 +98,8 @@ func WithTimeout(timeout time.Duration) AdminCallOption {

const (
defaultPipelineSize = 2
minPipelineSize = 1
maxPipelineSize = 8
minPipelineSize = storagenode.MinAppendPipelineSize
maxPipelineSize = storagenode.MaxAppendPipelineSize
)

type logStreamAppenderConfig struct {
Expand Down
72 changes: 53 additions & 19 deletions pkg/varlog/log_stream_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type LogStreamAppender interface {
// The only error from the AppendBatch is ErrClosed, which is returned when
// the LogStreamAppender is already closed. It returns nil even if the
// underlying stream is disconnected and notifies errors via callback.
//
// It is safe to have multiple goroutines calling AppendBatch
// simultaneously, but the order between them is not guaranteed.
AppendBatch(dataBatch [][]byte, callback BatchCallback) error
Expand All @@ -48,8 +49,9 @@ type LogStreamAppender interface {
type BatchCallback func([]varlogpb.LogEntryMeta, error)

type cbQueueEntry struct {
cb BatchCallback
err error
cb BatchCallback
data [][]byte
err error
}

func newCallbackQueueEntry() *cbQueueEntry {
Expand All @@ -71,8 +73,10 @@ type logStreamAppender struct {
logStreamAppenderConfig
stream snpb.LogIO_AppendClient
cancelFunc context.CancelCauseFunc
causeFunc func() error
sema chan struct{}
cbq chan *cbQueueEntry
sq chan *cbQueueEntry
rq chan *cbQueueEntry
wg sync.WaitGroup
closed struct {
xsync.RBMutex
Expand Down Expand Up @@ -110,10 +114,15 @@ func (v *logImpl) newLogStreamAppender(ctx context.Context, tpid types.TopicID,
logStreamAppenderConfig: cfg,
stream: stream,
sema: make(chan struct{}, cfg.pipelineSize),
cbq: make(chan *cbQueueEntry, cfg.pipelineSize),
sq: make(chan *cbQueueEntry, cfg.pipelineSize),
rq: make(chan *cbQueueEntry, cfg.pipelineSize),
cancelFunc: cancelFunc,
causeFunc: func() error {
return context.Cause(ctx)
},
}
lsa.wg.Add(1)
lsa.wg.Add(2)
go lsa.sendLoop()
go lsa.recvLoop()
return lsa, nil
}
Expand All @@ -128,23 +137,14 @@ func (lsa *logStreamAppender) AppendBatch(dataBatch [][]byte, callback BatchCall
lsa.sema <- struct{}{}

qe := newCallbackQueueEntry()
qe.data = dataBatch
qe.cb = callback

err := lsa.stream.Send(&snpb.AppendRequest{
TopicID: lsa.tpid,
LogStreamID: lsa.lsid,
Payload: dataBatch,
})
if err != nil {
_ = lsa.stream.CloseSend()
qe.err = err
}
lsa.cbq <- qe
lsa.sq <- qe
return nil
}

func (lsa *logStreamAppender) Close() {
lsa.cancelFunc(nil)
lsa.cancelFunc(ErrClosed)

lsa.closed.Lock()
defer lsa.closed.Unlock()
Expand All @@ -153,10 +153,39 @@ func (lsa *logStreamAppender) Close() {
}
lsa.closed.value = true

close(lsa.cbq)
close(lsa.sq)
lsa.wg.Wait()
}

func (lsa *logStreamAppender) sendLoop() {
defer func() {
close(lsa.rq)
lsa.wg.Done()
}()

var sendErr error
req := &snpb.AppendRequest{
TopicID: lsa.tpid,
LogStreamID: lsa.lsid,
}
for qe := range lsa.sq {
if sendErr == nil {
req.Payload = qe.data
sendErr = lsa.stream.Send(req)
if sendErr != nil {
if cause := lsa.causeFunc(); cause != nil {
sendErr = cause
}
_ = lsa.stream.CloseSend()
}
}
if sendErr != nil {
qe.err = sendErr
}
lsa.rq <- qe
}
}

func (lsa *logStreamAppender) recvLoop() {
defer lsa.wg.Done()

Expand All @@ -165,7 +194,7 @@ func (lsa *logStreamAppender) recvLoop() {
var cb BatchCallback
rsp := &snpb.AppendResponse{}

for qe := range lsa.cbq {
for qe := range lsa.rq {
meta = nil
err = qe.err
if err != nil {
Expand All @@ -188,6 +217,11 @@ func (lsa *logStreamAppender) recvLoop() {
break
}
Call:
if err != nil {
if cause := lsa.causeFunc(); cause != nil {
err = cause
}
}
if qe.cb != nil {
cb = qe.cb
} else {
Expand Down
35 changes: 32 additions & 3 deletions tests/it/cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/testutil"
Expand Down Expand Up @@ -780,7 +778,7 @@ func TestLogStreamAppender(t *testing.T) {
cb := func(metas []varlogpb.LogEntryMeta, err error) {
called.Add(1)
if err != nil {
assert.Equal(t, codes.Canceled, status.Code(err))
assert.Equal(t, varlog.ErrClosed, err)
return
}
assert.NoError(t, err)
Expand Down Expand Up @@ -833,6 +831,37 @@ func TestLogStreamAppender(t *testing.T) {
}, 5*time.Second, 100*time.Millisecond)
},
},
{
name: "ConcurrentAppendBatch",
testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize))
require.NoError(t, err)
defer func() {
lsa.Close()
}()

var wg sync.WaitGroup
wg.Add(calls * 2)
expected := 0
dataBatch := [][]byte{[]byte("foo")}
cb := func(metas []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
assert.Len(t, metas, 1)
expected++
assert.EqualValues(t, expected, metas[0].LLSN)
assert.EqualValues(t, expected, metas[0].GLSN)
}
for i := 0; i < calls; i++ {
go func() {
defer wg.Done()
err := lsa.AppendBatch(dataBatch, cb)
require.NoError(t, err)
}()
}
wg.Wait()
},
},
}

for _, tc := range tcs {
Expand Down