Skip to content

Commit

Permalink
fix(varlogtest): log stream appender can be closed in the callback (#595
Browse files Browse the repository at this point in the history
)

### What this PR does

This PR fixed the varlogtest to be able to close the log stream appender in the callback.
  • Loading branch information
ijsong authored Oct 4, 2023
2 parents d2e88ee + e08371b commit 2480922
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 6 deletions.
44 changes: 39 additions & 5 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,26 @@ import (
type testLog struct {
vt *VarlogTest
closed bool

lsaPool struct {
lsaMap map[int64]*logStreamAppender
nextID int64
mu sync.Mutex

wg sync.WaitGroup
}
}

var _ varlog.Log = (*testLog)(nil)

func newTestLog(vt *VarlogTest) *testLog {
c := &testLog{
vt: vt,
}
c.lsaPool.lsaMap = make(map[int64]*logStreamAppender)
return c
}

func (c *testLog) lock() error {
c.vt.cond.L.Lock()
if c.closed {
Expand All @@ -38,6 +54,14 @@ func (c *testLog) Close() error {
defer c.vt.cond.L.Unlock()
c.closed = true
c.vt.cond.Broadcast()

c.lsaPool.mu.Lock()
defer c.lsaPool.mu.Unlock()
for _, lsa := range c.lsaPool.lsaMap {
lsa.Close()
}

c.lsaPool.wg.Wait()
return nil
}

Expand Down Expand Up @@ -342,9 +366,22 @@ func (c *testLog) NewLogStreamAppender(tpid types.TopicID, lsid types.LogStreamI
lsa.queue.ch = make(chan *queueEntry, pipelineSize)
lsa.queue.cv = sync.NewCond(&lsa.queue.mu)

lsa.wg.Add(1)
c.lsaPool.mu.Lock()
defer c.lsaPool.mu.Unlock()
id := c.lsaPool.nextID
c.lsaPool.nextID++
c.lsaPool.lsaMap[id] = lsa

c.lsaPool.wg.Add(1)
go func() {
defer lsa.wg.Done()
defer func() {
c.lsaPool.wg.Done()

c.lsaPool.mu.Lock()
defer c.lsaPool.mu.Unlock()
delete(c.lsaPool.lsaMap, id)
}()

for qe := range lsa.queue.ch {
qe.callback(qe.result.Metadata, qe.result.Err)
lsa.queue.cv.L.Lock()
Expand Down Expand Up @@ -377,8 +414,6 @@ type logStreamAppender struct {
cv *sync.Cond
mu sync.Mutex
}

wg sync.WaitGroup
}

var _ varlog.LogStreamAppender = (*logStreamAppender)(nil)
Expand Down Expand Up @@ -417,7 +452,6 @@ func (lsa *logStreamAppender) Close() {
}
lsa.closed.value = true
close(lsa.queue.ch)
lsa.wg.Wait()
}

type errSubscriber struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/varlogtest/varlogtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (vt *VarlogTest) NewAdminClient() varlog.Admin {
}

func (vt *VarlogTest) NewLogClient() varlog.Log {
return &testLog{vt: vt}
return newTestLog(vt)
}

func (vt *VarlogTest) generateTopicID() types.TopicID {
Expand Down
33 changes: 33 additions & 0 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,39 @@ func TestVarlotTest_LogStreamAppender(t *testing.T) {
require.NoError(t, err)
},
},
{
name: "CloseInCallback",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
err = lsa.AppendBatch([][]byte{[]byte("foo")}, func(lem []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
lsa.Close()
})
require.NoError(t, err)
wg.Wait()
},
},
{
name: "DoesNotCloseLogStreamAppender",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
// Closing the log client will shut down the log stream appender forcefully.
lsa, err := vcli.NewLogStreamAppender(tpid, lsid)
require.NoError(t, err)

cb := func(_ []varlogpb.LogEntryMeta, err error) {
assert.NoError(t, err)
}
for i := 0; i < numLogs; i++ {
err := lsa.AppendBatch([][]byte{[]byte("foo")}, cb)
require.NoError(t, err)
}
},
},
{
name: "Manager",
testf: func(t *testing.T, vadm varlog.Admin, vcli varlog.Log, tpid types.TopicID, lsid types.LogStreamID) {
Expand Down

0 comments on commit 2480922

Please sign in to comment.