Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): add LogStreamAppender #459

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (c *LogClient) Append(ctx context.Context, tpid types.TopicID, lsid types.L
return rsp.Results, nil
}

func (c *LogClient) AppendStream(ctx context.Context) (snpb.LogIO_AppendClient, error) {
return c.rpcClient.Append(ctx)
}

// Subscribe gets log entries continuously from the storage node. It guarantees that LLSNs of log
// entries taken are sequential.
func (c *LogClient) Subscribe(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, begin, end types.GLSN) (<-chan SubscribeResult, error) {
Expand Down
72 changes: 72 additions & 0 deletions pkg/varlog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package varlog
import (
"context"
"time"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

type adminConfig struct {
Expand Down Expand Up @@ -91,3 +94,72 @@ func WithTimeout(timeout time.Duration) AdminCallOption {
cfg.timeout.set = true
})
}

const (
defaultPipelineSize = 2
minPipelineSize = 1
maxPipelineSize = 8
)

type logStreamAppenderConfig struct {
defaultBatchCallback BatchCallback
tpid types.TopicID
lsid types.LogStreamID
pipelineSize int
}

func newLogStreamAppenderConfig(opts []LogStreamAppenderOption) logStreamAppenderConfig {
cfg := logStreamAppenderConfig{
pipelineSize: defaultPipelineSize,
defaultBatchCallback: func([]varlogpb.LogEntryMeta, error) {},
}
for _, opt := range opts {
opt.applyLogStreamAppender(&cfg)
}
cfg.ensureDefault()
return cfg
}

func (cfg *logStreamAppenderConfig) ensureDefault() {
if cfg.pipelineSize < minPipelineSize {
cfg.pipelineSize = minPipelineSize
}
if cfg.pipelineSize > maxPipelineSize {
cfg.pipelineSize = maxPipelineSize
}
}

type funcLogStreamAppenderOption struct {
f func(*logStreamAppenderConfig)
}

func newFuncLogStreamAppenderOption(f func(config *logStreamAppenderConfig)) *funcLogStreamAppenderOption {
return &funcLogStreamAppenderOption{f: f}
}

func (fo *funcLogStreamAppenderOption) applyLogStreamAppender(cfg *logStreamAppenderConfig) {
fo.f(cfg)
}

// LogStreamAppenderOption configures a LogStreamAppender.
type LogStreamAppenderOption interface {
applyLogStreamAppender(config *logStreamAppenderConfig)
}

// WithPipelineSize sets request pipeline size. The default pipeline size is
// two. Any value below one will be set to one, and any above eight will be
// limited to eight.
func WithPipelineSize(pipelineSize int) LogStreamAppenderOption {
return newFuncLogStreamAppenderOption(func(cfg *logStreamAppenderConfig) {
cfg.pipelineSize = pipelineSize
})
}

// WithDefaultBatchCallback sets the default callback function. The default callback
// function can be overridden by the argument callback of the AppendBatch
// method.
func WithDefaultBatchCallback(defaultBatchCallback BatchCallback) LogStreamAppenderOption {
return newFuncLogStreamAppenderOption(func(cfg *logStreamAppenderConfig) {
cfg.defaultBatchCallback = defaultBatchCallback
})
}
5 changes: 5 additions & 0 deletions pkg/varlog/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package varlog

import "errors"

var ErrClosed = errors.New("client: closed")
7 changes: 7 additions & 0 deletions pkg/varlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Log interface {
// replica. If none of the replicas' statuses is either appendable or
// sealed, it returns an error.
PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error)

// NewLogStreamAppender returns a new LogStreamAppender.
NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error)
}

type AppendResult struct {
Expand Down Expand Up @@ -177,6 +180,10 @@ func (v *logImpl) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty
return v.peekLogStream(ctx, tpid, lsid)
}

func (v *logImpl) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error) {
return v.newLogStreamAppender(context.Background(), tpid, lsid, opts...)
}

func (v *logImpl) Close() (err error) {
if v.closed.Load() {
return
Expand Down
20 changes: 20 additions & 0 deletions pkg/varlog/log_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

201 changes: 201 additions & 0 deletions pkg/varlog/log_stream_appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package varlog

import (
"context"
"errors"
"fmt"
"sync"

"github.com/puzpuzpuz/xsync/v2"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

// LogStreamAppender is a client only to be able to append to a particular log
// stream.
type LogStreamAppender interface {
// AppendBatch appends dataBatch to the given log stream asynchronously.
// Users can call this method without being blocked until the pipeline of
// the LogStreamAppender is full. If the pipeline of the LogStreamAppender
// is already full, it may become blocked. However, the process will
// continue once a response is received from the storage node.
// On completion of AppendBatch, the argument callback provided by users
// will be invoked. All callback functions registered to the same
// LogStreamAppender will be called by the same goroutine sequentially.
// Therefore, the callback should be lightweight. If heavy work is
// necessary for the callback, it would be better to use separate worker
// goroutines.
// The only error from the AppendBatch is ErrClosed, which is returned when
// the LogStreamAppender is already closed. It returns nil even if the
// underlying stream is disconnected and notifies errors via callback.
// It is safe to have multiple goroutines calling AppendBatch
// simultaneously, but the order between them is not guaranteed.
AppendBatch(dataBatch [][]byte, callback BatchCallback) error

// Close closes the LogStreamAppender client. Once the client is closed,
// calling AppendBatch will fail immediately. If AppendBatch still waits
// for room of pipeline, Close will be blocked. It also waits for all
// pending callbacks to be called.
// It's important for users to avoid calling Close within the callback
// function, as it may cause indefinite blocking.
Close()
}

// BatchCallback is a callback function to notify the result of
// AppendBatch.
type BatchCallback func([]varlogpb.LogEntryMeta, error)

type cbQueueEntry struct {
cb BatchCallback
err error
}

func newCallbackQueueEntry() *cbQueueEntry {
return callbackQueueEntryPool.Get().(*cbQueueEntry)
}

func (cqe *cbQueueEntry) Release() {
*cqe = cbQueueEntry{}
callbackQueueEntryPool.Put(cqe)
}

var callbackQueueEntryPool = sync.Pool{
New: func() any {
return &cbQueueEntry{}
},
}

type logStreamAppender struct {
logStreamAppenderConfig
stream snpb.LogIO_AppendClient
cancelFunc context.CancelCauseFunc
sema chan struct{}
cbq chan *cbQueueEntry
wg sync.WaitGroup
closed struct {
xsync.RBMutex
value bool
}
}

var _ LogStreamAppender = (*logStreamAppender)(nil)

func (v *logImpl) newLogStreamAppender(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error) {
replicas, ok := v.replicasRetriever.Retrieve(tpid, lsid)
if !ok {
return nil, fmt.Errorf("client: log stream %d of topic %d does not exist", lsid, tpid)
}

snid := replicas[0].StorageNodeID
addr := replicas[0].Address
cl, err := v.logCLManager.GetOrConnect(ctx, snid, addr)
if err != nil {
v.allowlist.Deny(tpid, lsid)
return nil, fmt.Errorf("client: %w", err)
}

ctx, cancelFunc := context.WithCancelCause(ctx)
stream, err := cl.AppendStream(ctx)
if err != nil {
cancelFunc(err)
return nil, fmt.Errorf("client: %w", err)
}

cfg := newLogStreamAppenderConfig(opts)
cfg.tpid = tpid
cfg.lsid = lsid
lsa := &logStreamAppender{
logStreamAppenderConfig: cfg,
stream: stream,
sema: make(chan struct{}, cfg.pipelineSize),
cbq: make(chan *cbQueueEntry, cfg.pipelineSize),
cancelFunc: cancelFunc,
}
lsa.wg.Add(1)
go lsa.recvLoop()
return lsa, nil
}

func (lsa *logStreamAppender) AppendBatch(dataBatch [][]byte, callback BatchCallback) error {
rt := lsa.closed.RLock()
defer lsa.closed.RUnlock(rt)
if lsa.closed.value {
return ErrClosed
}

lsa.sema <- struct{}{}

qe := newCallbackQueueEntry()
qe.cb = callback

err := lsa.stream.Send(&snpb.AppendRequest{
TopicID: lsa.tpid,
LogStreamID: lsa.lsid,
Payload: dataBatch,
})
if err != nil {
_ = lsa.stream.CloseSend()
qe.err = err
}
lsa.cbq <- qe
return nil
}

func (lsa *logStreamAppender) Close() {
lsa.cancelFunc(nil)

lsa.closed.Lock()
defer lsa.closed.Unlock()
if lsa.closed.value {
return
}
lsa.closed.value = true

close(lsa.cbq)
lsa.wg.Wait()
}

func (lsa *logStreamAppender) recvLoop() {
defer lsa.wg.Done()

var err error
var meta []varlogpb.LogEntryMeta
var cb BatchCallback
rsp := &snpb.AppendResponse{}

for qe := range lsa.cbq {
meta = nil
err = qe.err
if err != nil {
goto Call
}

rsp.Reset()
err = lsa.stream.RecvMsg(rsp)
if err != nil {
goto Call
}

meta = make([]varlogpb.LogEntryMeta, len(rsp.Results))
for idx, res := range rsp.Results {
if len(res.Error) == 0 {
meta[idx] = res.Meta
continue
}
err = errors.New(res.Error)
break
}
Call:
if qe.cb != nil {
cb = qe.cb
} else {
cb = lsa.defaultBatchCallback
}
if cb != nil {
cb(meta, err)
}
<-lsa.sema
}
}
1 change: 1 addition & 0 deletions pkg/varlog/log_stream_appender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package varlog
Loading