Skip to content

Commit

Permalink
fix(storagenode): remove data directory in removing log stream replica
Browse files Browse the repository at this point in the history
Resolves kakao#157
  • Loading branch information
ijsong committed Sep 20, 2022
1 parent 8e07001 commit 9db1aed
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
5 changes: 5 additions & 0 deletions internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ func (lse *Executor) Trim(_ context.Context, glsn types.GLSN) error {
return nil
}

// Path returns the data directory where the replica stores its data.
func (lse *Executor) Path() string {
return lse.stg.Path()
}

func (lse *Executor) Metrics() *telemetry.LogStreamMetrics {
return lse.lsm
}
Expand Down
12 changes: 7 additions & 5 deletions internal/storagenode/storagenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"net"
"net/http"
"os"
"path"
"strings"
"sync"
Expand Down Expand Up @@ -341,12 +342,13 @@ func (sn *StorageNode) removeLogStreamReplica(_ context.Context, tpid types.Topi
if !loaded {
return verrors.ErrNotExist
}
_ = lse.Close()
if err := lse.Close(); err != nil {
sn.logger.Warn("error while closing log stream replica")
}
telemetry.UnregisterLogStreamMetrics(sn.metrics, lsid)
// TODO (jun): Is removing data path optional or default behavior?
// if err := os.RemoveAll(lse.StorageNodePath()); err != nil {
// sn.logger.Warn("error while removing log stream path")
// }
if err := os.RemoveAll(lse.Path()); err != nil {
sn.logger.Warn("error while removing log stream path")
}
return nil
}

Expand Down
63 changes: 63 additions & 0 deletions internal/storagenode/storagenode_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package storagenode

import (
"context"
"io/fs"
"math/rand"
"os"
"path/filepath"
Expand All @@ -10,9 +12,12 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kakao/varlog/internal/reportcommitter"
"github.com/kakao/varlog/internal/storagenode/client"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)
Expand Down Expand Up @@ -430,3 +435,61 @@ func TestStorageNode_MakeVolumesAbsolute(t *testing.T) {
assert.True(t, filepath.IsAbs(volume))
}
}

func TestStorageNode_RemoveLogStreamReplica(t *testing.T) {
const (
tpid = types.TopicID(1)
lsid = types.LogStreamID(1)
)

tcs := []struct {
name string
testf func(t *testing.T, snpath string, mc *client.ManagementClient)
}{
{
name: "Succeed",
testf: func(t *testing.T, snpath string, mc *client.ManagementClient) {
ctx := context.Background()
lsrmd, err := mc.AddLogStreamReplica(ctx, tpid, lsid, snpath)
require.NoError(t, err)
_, err = os.ReadDir(lsrmd.Path)
require.NoError(t, err)

err = mc.RemoveLogStream(ctx, tpid, lsid)
require.NoError(t, err)
_, err = os.ReadDir(lsrmd.Path)
require.ErrorIs(t, err, fs.ErrNotExist)
},
},
{
name: "NotFound",
testf: func(t *testing.T, _ string, mc *client.ManagementClient) {
ctx := context.Background()
err := mc.RemoveLogStream(ctx, tpid, lsid)
require.ErrorIs(t, err, verrors.ErrNotExist)
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
sn := TestNewSimpleStorageNode(t)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, sn.Serve())
}()
defer func() {
assert.NoError(t, sn.Close())
wg.Wait()
}()

addr := TestGetAdvertiseAddress(t, sn)
mc, mcClose := TestNewManagementClient(t, sn.cid, sn.snid, addr)
defer mcClose()

tc.testf(t, sn.snPaths[0], mc)
})
}
}

0 comments on commit 9db1aed

Please sign in to comment.