diff --git a/pkg/varlog/x/mlsa/manager.go b/pkg/varlog/x/mlsa/manager.go index f39840ab9..b2ac25eca 100644 --- a/pkg/varlog/x/mlsa/manager.go +++ b/pkg/varlog/x/mlsa/manager.go @@ -27,6 +27,7 @@ func (m *managedLSA) Close() { appenders, ok := m.mgr.mlsas[m.tpid] if ok && appenders[m.lsid] == m { delete(appenders, m.lsid) + m.mgr.count-- } m.mgr.mu.Unlock() @@ -41,6 +42,7 @@ func (m *managedLSA) Close() { // by calling the Get function again. type Manager struct { mlsas map[types.TopicID]map[types.LogStreamID]*managedLSA + count int mu *xsync.RBMutex vcli varlog.Log @@ -94,6 +96,7 @@ func (mgr *Manager) getSlow(tpid types.TopicID, lsid types.LogStreamID) (varlog. mgr: mgr, } appenders[lsid] = mlsa + mgr.count++ } return mlsa, nil } @@ -127,3 +130,37 @@ func (mgr *Manager) Any(tpid types.TopicID, allowlist map[types.LogStreamID]stru } return nil, errors.New("no appendable log stream") } + +// Clear closes all the managed LogStreamAppender, and clears them. Clients can +// continue to use this Manager after calling Clear. +// +// After using the Manager, clients should call Clear to release any associated +// resources. +func (mgr *Manager) Clear() { + lsas := mgr.clear() + for _, lsa := range lsas { + lsa.Close() + } +} + +func (mgr *Manager) clear() []varlog.LogStreamAppender { + mgr.mu.Lock() + defer func() { + mgr.count = 0 + mgr.mu.Unlock() + }() + + if len(mgr.mlsas) == 0 { + return nil + } + + lsas := make([]varlog.LogStreamAppender, 0, mgr.count) + for tpid, appenders := range mgr.mlsas { + for lsid, mlsa := range appenders { + lsas = append(lsas, mlsa.lsa) + delete(appenders, lsid) + } + delete(mgr.mlsas, tpid) + } + return lsas +} diff --git a/tests/it/cluster/client_test.go b/tests/it/cluster/client_test.go index f6d78f71e..199bb13b2 100644 --- a/tests/it/cluster/client_test.go +++ b/tests/it/cluster/client_test.go @@ -1090,6 +1090,44 @@ func TestLogStreamAppender(t *testing.T) { lsa.Close() }, }, + { + name: "Manager_Clear", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + mgr := mlsa.New(vcli) + + lsa, err := mgr.Get(tpid, lsid) + require.NoError(t, err) + + var wg sync.WaitGroup + dataBatch := [][]byte{[]byte("foo")} + wg.Add(1) + err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + }) + require.NoError(t, err) + wg.Wait() + + mgr.Clear() + err = lsa.AppendBatch(dataBatch, func([]varlogpb.LogEntryMeta, error) { + assert.Fail(t, "unexpected callback") + }) + require.Error(t, err) + + lsa, err = mgr.Get(tpid, lsid) + require.NoError(t, err) + + wg.Add(1) + err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + }) + require.NoError(t, err) + wg.Wait() + + mgr.Clear() + }, + }, } for _, tc := range tcs {