Skip to content

Commit

Permalink
Merge pull request #1327 from aaronlehmann/gc-wals
Browse files Browse the repository at this point in the history
raft: Garbage collect WAL files
  • Loading branch information
abronan authored Aug 10, 2016
2 parents cb6d813 + c72cdf1 commit f0e46aa
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 15 deletions.
109 changes: 94 additions & 15 deletions manager/state/raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"strings"

"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
}
n.wal, err = wal.Create(n.walDir(), metadata)
if err != nil {
return raft.Peer{}, fmt.Errorf("create wal error: %v", err)
return raft.Peer{}, fmt.Errorf("create WAL error: %v", err)
}

n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
Expand Down Expand Up @@ -127,15 +128,15 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
repaired := false
for {
if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil {
return fmt.Errorf("open wal error: %v", err)
return fmt.Errorf("open WAL error: %v", err)
}
if metadata, st, ents, err = n.wal.ReadAll(); err != nil {
if err := n.wal.Close(); err != nil {
return err
}
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
return fmt.Errorf("read wal error (%v) and cannot be repaired", err)
return fmt.Errorf("read WAL error (%v) and cannot be repaired", err)
}
if !wal.Repair(n.walDir()) {
return fmt.Errorf("WAL error (%v) cannot be repaired", err)
Expand All @@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC

var raftNode api.RaftMember
if err := raftNode.Unmarshal(metadata); err != nil {
return fmt.Errorf("error unmarshalling wal metadata: %v", err)
return fmt.Errorf("error unmarshalling WAL metadata: %v", err)
}
n.Config.ID = raftNode.RaftID

Expand Down Expand Up @@ -274,25 +275,103 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e
// This means that if the current snapshot doesn't appear in the
// directory for some strange reason, we won't delete anything, which
// is the safe behavior.
curSnapshotIdx := -1
var (
afterCurSnapshot bool
removeErr error
removeErr error
oldestSnapshot string
)

for i, snapFile := range snapshots {
if afterCurSnapshot {
if uint64(len(snapshots)-i) <= keepOldSnapshots {
return removeErr
}
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
if err != nil && removeErr == nil {
removeErr = err
if curSnapshotIdx >= 0 && i > curSnapshotIdx {
if uint64(i-curSnapshotIdx) > keepOldSnapshots {
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
if err != nil && removeErr == nil {
removeErr = err
}
continue
}
} else if snapFile == curSnapshot {
afterCurSnapshot = true
curSnapshotIdx = i
}
oldestSnapshot = snapFile
}

if removeErr != nil {
return removeErr
}

// Remove any WAL files that only contain data from before the oldest
// remaining snapshot.

if oldestSnapshot == "" {
return nil
}

// Parse index out of oldest snapshot's filename
var snapTerm, snapIndex uint64
_, err = fmt.Sscanf(oldestSnapshot, "%016x-%016x.snap", &snapTerm, &snapIndex)
if err != nil {
return fmt.Errorf("malformed snapshot filename %s: %v", oldestSnapshot, err)
}

// List the WALs
dirents, err = ioutil.ReadDir(n.walDir())
if err != nil {
return err
}

var wals []string
for _, dirent := range dirents {
if strings.HasSuffix(dirent.Name(), ".wal") {
wals = append(wals, dirent.Name())
}
}

return removeErr
// Sort WAL filenames in lexical order
sort.Sort(sort.StringSlice(wals))

found := false
deleteUntil := -1

for i, walName := range wals {
var walSeq, walIndex uint64
_, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex)
if err != nil {
return fmt.Errorf("could not parse WAL name %s: %v", walName, err)
}

if walIndex >= snapIndex {
deleteUntil = i - 1
found = true
break
}
}

// If all WAL files started with indices below the oldest snapshot's
// index, we can delete all but the newest WAL file.
if !found && len(wals) != 0 {
deleteUntil = len(wals) - 1
}

for i := 0; i < deleteUntil; i++ {
walPath := filepath.Join(n.walDir(), wals[i])
l, err := fileutil.NewLock(walPath)
if err != nil {
continue
}
err = l.TryLock()
if err != nil {
return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err)
}
err = os.Remove(walPath)
l.Unlock()
l.Destroy()
if err != nil {
return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err)
}
}

return nil
}

func (n *Node) doSnapshot(raftConfig *api.RaftConfig) {
Expand Down
180 changes: 180 additions & 0 deletions manager/state/raft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"io/ioutil"
"path/filepath"
"testing"
"time"

"github.com/docker/swarmkit/api"
raftutils "github.com/docker/swarmkit/manager/state/raft/testutils"
"github.com/docker/swarmkit/manager/state/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

func TestRaftSnapshot(t *testing.T) {
Expand Down Expand Up @@ -243,3 +246,180 @@ func TestRaftSnapshotRestart(t *testing.T) {
require.NoError(t, err)
raftutils.CheckValuesOnNodes(t, clockSource, nodes, nodeIDs, values)
}

func TestGCWAL(t *testing.T) {
if testing.Short() {
t.Skip("TestGCWAL skipped with -short because it's resource-intensive")
}
t.Parallel()

// Additional log entries from cluster setup, leader election
extraLogEntries := 5
// Number of large entries to propose
proposals := 47

// Bring up a 3 node cluster
nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})

for i := 0; i != proposals; i++ {
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
assert.NoError(t, err, "failed to propose value")
}

time.Sleep(250 * time.Millisecond)

// Snapshot should have been triggered just before the WAL rotated, so
// both WAL files should be preserved
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
}

dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
if err != nil {
return err
}
if len(dirents) != 2 {
return fmt.Errorf("expected 2 WAL files, found %d", len(dirents))
}
return nil
}))

raftutils.TeardownCluster(t, nodes)

// Repeat this test, but trigger the snapshot after the WAL has rotated
proposals++
nodes, clockSource = raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
defer raftutils.TeardownCluster(t, nodes)

for i := 0; i != proposals; i++ {
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
assert.NoError(t, err, "failed to propose value")
}

time.Sleep(250 * time.Millisecond)

// This time only one WAL file should be saved.
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
}

dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 WAL file, found %d", len(dirents))
}
return nil
}))

// Restart the whole cluster
for _, node := range nodes {
node.Server.Stop()
node.Shutdown()
}

raftutils.AdvanceTicks(clockSource, 5)

i := 0
for k, node := range nodes {
nodes[k] = raftutils.RestartNode(t, clockSource, node, false)
i++
}
raftutils.WaitForCluster(t, clockSource, nodes)

// Is the data intact after restart?
for _, node := range nodes {
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
var err error
node.MemoryStore().View(func(tx store.ReadTx) {
var allNodes []*api.Node
allNodes, err = store.FindNodes(tx, store.All)
if err != nil {
return
}
if len(allNodes) != proposals {
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
return
}
})
return err
}))
}

// It should still be possible to propose values
_, err := raftutils.ProposeValue(t, raftutils.Leader(nodes), DefaultProposalTime, "newnode")
assert.NoError(t, err, "failed to propose value")

for _, node := range nodes {
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
var err error
node.MemoryStore().View(func(tx store.ReadTx) {
var allNodes []*api.Node
allNodes, err = store.FindNodes(tx, store.All)
if err != nil {
return
}
if len(allNodes) != proposals+1 {
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
return
}
})
return err
}))
}
}

// proposeHugeValue proposes a 1.4MB value to a raft test cluster
func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) {
nodeIDStr := "id1"
if len(nodeID) != 0 {
nodeIDStr = nodeID[0]
}
a := make([]byte, 1400000)
for i := 0; i != len(a); i++ {
a[i] = 'a'
}
node := &api.Node{
ID: nodeIDStr,
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: nodeIDStr,
Labels: map[string]string{
"largestring": string(a),
},
},
},
}

storeActions := []*api.StoreAction{
{
Action: api.StoreActionKindCreate,
Target: &api.StoreAction_Node{
Node: node,
},
},
}

ctx, _ := context.WithTimeout(context.Background(), time)

err := raftNode.ProposeValue(ctx, storeActions, func() {
err := raftNode.MemoryStore().ApplyStoreActions(storeActions)
assert.NoError(t, err, "error applying actions")
})
if err != nil {
return nil, err
}

return node, nil
}

0 comments on commit f0e46aa

Please sign in to comment.