Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: cmd-warmup add option "check" and "evict" #4370

Merged
merged 8 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 71 additions & 15 deletions cmd/warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"bufio"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -28,8 +29,10 @@ import (
"syscall"
"time"

"github.com/dustin/go-humanize"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -71,6 +74,14 @@ $ juicefs warmup -f /tmp/filelist`,
Aliases: []string{"b"},
Usage: "run in background",
},
&cli.BoolFlag{
Name: "evict",
Usage: "evict cached blocks",
},
&cli.BoolFlag{
Name: "check",
Usage: "check whether the data blocks are cached or not",
},
},
}
}
Expand Down Expand Up @@ -128,35 +139,59 @@ END:
}

// send fill-cache command to controller file
func sendCommand(cf *os.File, batch []string, threads uint, background bool, dspin *utils.DoubleSpinner) {
func sendCommand(cf *os.File, action vfs.CacheAction, batch []string, threads uint, background bool, dspin *utils.DoubleSpinner) vfs.CacheResponse {
paths := strings.Join(batch, "\n")
var back uint8
if background {
back = 1
}
wb := utils.NewBuffer(8 + 4 + 3 + uint32(len(paths)))
headerLen, bodyLen := uint32(8), uint32(4+len(paths)+2+1+1)
wb := utils.NewBuffer(headerLen + bodyLen)
wb.Put32(meta.FillCache)
wb.Put32(4 + 3 + uint32(len(paths)))
wb.Put32(bodyLen)

wb.Put32(uint32(len(paths)))
wb.Put([]byte(paths))
wb.Put16(uint16(threads))
wb.Put8(back)
wb.Put8(uint8(action))

if _, err := cf.Write(wb.Bytes()); err != nil {
logger.Fatalf("Write message: %s", err)
}

var resp vfs.CacheResponse
if background {
logger.Infof("Warm-up cache for %d paths in background", len(batch))
return
logger.Infof("%s for %d paths in background", action, len(batch))
return resp
}
if _, errno := readProgress(cf, func(count, bytes uint64) {
dspin.SetCurrent(int64(count), int64(bytes))
}); errno != 0 {
logger.Fatalf("Warm up failed: %s", errno)

lastCnt, lastBytes := dspin.Current()
data, errno := readProgress(cf, func(fileCount, totalBytes uint64) {
dspin.SetCurrent(lastCnt+int64(fileCount), lastBytes+int64(totalBytes))
})

if errno != 0 {
logger.Fatalf("%s failed: %s", action, errno)
}

err := json.Unmarshal(data, &resp)
if err != nil {
logger.Fatalf("unmarshal error: %s", err)
}

return resp
}

func warmup(ctx *cli.Context) error {
setup(ctx, 0)

evict := ctx.Bool("evict")
check := ctx.Bool("check")
if evict && check {
logger.Fatalf("--check and --evict can't be used together")
}

var paths []string
for _, p := range ctx.Args().Slice() {
if abs, err := filepath.Abs(p); err == nil {
Expand Down Expand Up @@ -186,7 +221,7 @@ func warmup(ctx *cli.Context) error {
}
}
if len(paths) == 0 {
logger.Infof("Nothing to warm up")
logger.Infof("no path")
return nil
}

Expand Down Expand Up @@ -214,11 +249,20 @@ func warmup(ctx *cli.Context) error {
logger.Warnf("threads should be larger than 0, reset it to 1")
threads = 1
}

action := vfs.WarmupCache
if evict {
action = vfs.EvictCache
} else if check {
action = vfs.CheckCache
}

background := ctx.Bool("background")
start := len(mp)
batch := make([]string, 0, batchMax)
progress := utils.NewProgress(background)
dspin := progress.AddDoubleSpinner("Warming up")
dspin := progress.AddDoubleSpinner(string(action))
total := &vfs.CacheResponse{}
for _, path := range paths {
if mp == "/" {
inode, err := utils.GetFileInode(path)
Expand All @@ -234,18 +278,30 @@ func warmup(ctx *cli.Context) error {
continue
}
if len(batch) >= batchMax {
sendCommand(controller, batch, threads, background, dspin)
resp := sendCommand(controller, action, batch, threads, background, dspin)
total.Add(resp)
batch = batch[0:]
}
}
if len(batch) > 0 {
sendCommand(controller, batch, threads, background, dspin)
resp := sendCommand(controller, action, batch, threads, background, dspin)
total.Add(resp)
}
progress.Done()

if !background {
count, bytes := dspin.Current()
logger.Infof("Successfully warmed up %d files (%d bytes)", count, bytes)
switch action {
case vfs.WarmupCache:
logger.Infof("%s: %d files (%s bytes)", action, count, humanize.IBytes(uint64(bytes)))
case vfs.EvictCache:
logger.Infof("%s: %d files (%s bytes)", action, count, humanize.IBytes(uint64(bytes)))
case vfs.CheckCache:
logger.Infof("%s: %d files (%s of %s (%2.1f%%)) cached", action, count,
humanize.IBytes(uint64(bytes)-total.MissBytes),
humanize.IBytes(uint64(bytes)),
float64(uint64(bytes)-total.MissBytes)*100/float64(bytes))
}
}

return nil
}
24 changes: 24 additions & 0 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,30 @@ func (store *cachedStore) FillCache(id uint64, length uint32) error {
return err
}

func (store *cachedStore) EvictCache(id uint64, length uint32) error {
r := sliceForRead(id, int(length), store)
keys := r.keys()
for _, k := range keys {
store.bcache.remove(k)
}
return nil
}

func (store *cachedStore) CheckCache(id uint64, length uint32) (uint64, error) {
r := sliceForRead(id, int(length), store)
keys := r.keys()
missBytes := uint64(0)
for i, k := range keys {
tmpReader, err := store.bcache.load(k)
if err == nil {
_ = tmpReader.Close()
continue
}
missBytes += uint64(r.blockSize(i))
}
return missBytes, nil
}

func (store *cachedStore) UsedMemory() int64 {
return store.bcache.usedMemory()
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/chunk/cached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/juicedata/juicefs/pkg/object"
"github.com/stretchr/testify/assert"
)

func forgetSlice(store ChunkStore, sliceId uint64, size int) error {
Expand Down Expand Up @@ -246,6 +247,29 @@ func TestFillCache(t *testing.T) {
if cnt, used := bcache.stats(); cnt != 2 || used != expect {
t.Fatalf("cache cnt %d used %d, expect cnt 2 used %d", cnt, used, expect)
}

// check
missBytes, err := store.CheckCache(10, 1024)
assert.Nil(t, err)
assert.Equal(t, uint64(0), missBytes)

missBytes, err = store.CheckCache(11, uint32(bsize))
assert.Nil(t, err)
assert.Equal(t, uint64(0), missBytes)

// evict slice 11
err = store.EvictCache(11, uint32(bsize))
assert.Nil(t, err)

// stat
if cnt, used := bcache.stats(); cnt != 1 || used != 1024+4096 { // only chunk 10 cached
t.Fatalf("cache cnt %d used %d, expect cnt 1 used 5120", cnt, used)
}

// check again
missBytes, err = store.CheckCache(11, uint32(bsize))
assert.Nil(t, err)
assert.Equal(t, uint64(bsize), missBytes)
}

func BenchmarkCachedRead(b *testing.B) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type ChunkStore interface {
NewWriter(id uint64) Writer
Remove(id uint64, length int) error
FillCache(id uint64, length uint32) error
EvictCache(id uint64, length uint32) error
CheckCache(id uint64, length uint32) (uint64, error)
UsedMemory() int64
UpdateLimit(upload, download int64)
}
Loading