From df9b7fabc134e1131bb8151650b59f5caf73d455 Mon Sep 17 00:00:00 2001 From: jiefeng Date: Fri, 19 Jan 2024 15:55:17 +0800 Subject: [PATCH 1/8] feat: cmd/warmup add check and evict option Signed-off-by: jiefeng --- cmd/warmup.go | 78 +++++++++++++--- pkg/chunk/cached_store.go | 24 +++++ pkg/chunk/chunk.go | 2 + pkg/vfs/fill.go | 190 ++++++++++++++++++++++++++++++++++++++ pkg/vfs/internal.go | 41 +++++++- 5 files changed, 317 insertions(+), 18 deletions(-) diff --git a/cmd/warmup.go b/cmd/warmup.go index 8f2c7f3b3801..239753a79323 100644 --- a/cmd/warmup.go +++ b/cmd/warmup.go @@ -19,6 +19,7 @@ package cmd import ( "bufio" "encoding/binary" + "encoding/json" "fmt" "io" "os" @@ -28,8 +29,10 @@ import ( "syscall" "time" + "github.com/dustin/go-humanize" "github.com/juicedata/juicefs/pkg/meta" "github.com/juicedata/juicefs/pkg/utils" + "github.com/juicedata/juicefs/pkg/vfs" "github.com/urfave/cli/v2" ) @@ -71,6 +74,14 @@ $ juicefs warmup -f /tmp/filelist`, Aliases: []string{"b"}, Usage: "run in background", }, + &cli.BoolFlag{ + Name: "evict", + Usage: "evict cached blocks", + }, + &cli.BoolFlag{ + Name: "check", + Usage: "check whether the data blocks are cached or not", + }, }, } } @@ -128,7 +139,7 @@ END: } // send fill-cache command to controller file -func sendCommand(cf *os.File, batch []string, threads uint, background bool, dspin *utils.DoubleSpinner) { +func sendCommand(cf *os.File, action vfs.CacheAction, batch []string, threads uint, background bool, dspin *utils.DoubleSpinner) vfs.CacheResponse { paths := strings.Join(batch, "\n") var back uint8 if background { @@ -144,19 +155,39 @@ func sendCommand(cf *os.File, batch []string, threads uint, background bool, dsp if _, err := cf.Write(wb.Bytes()); err != nil { logger.Fatalf("Write message: %s", err) } + + var resp vfs.CacheResponse if background { - logger.Infof("Warm-up cache for %d paths in background", len(batch)) - return + logger.Infof("%s for %d paths in background", action, len(batch)) + return resp } - if _, errno := readProgress(cf, func(count, bytes uint64) { - dspin.SetCurrent(int64(count), int64(bytes)) - }); errno != 0 { - logger.Fatalf("Warm up failed: %s", errno) + + lastCnt, lastBytes := dspin.Current() + data, errno := readProgress(cf, func(fileCount, totalBytes uint64) { + dspin.SetCurrent(lastCnt+int64(fileCount), lastBytes+int64(totalBytes)) + }) + + if errno != 0 { + logger.Fatalf("%s failed: %s", action, errno) + } + + err := json.Unmarshal(data, &resp) + if err != nil { + logger.Fatalf("unmarshal error: %s", err) } + + return resp } func warmup(ctx *cli.Context) error { setup(ctx, 0) + + evict := ctx.Bool("evict") + check := ctx.Bool("check") + if evict && check { + logger.Fatalf("--check and --evict can't be used together") + } + var paths []string for _, p := range ctx.Args().Slice() { if abs, err := filepath.Abs(p); err == nil { @@ -186,7 +217,7 @@ func warmup(ctx *cli.Context) error { } } if len(paths) == 0 { - logger.Infof("Nothing to warm up") + logger.Infof("no path") return nil } @@ -214,11 +245,20 @@ func warmup(ctx *cli.Context) error { logger.Warnf("threads should be larger than 0, reset it to 1") threads = 1 } + + action := vfs.WarmupCache + if evict { + action = vfs.EvictCache + } else if check { + action = vfs.CheckCache + } + background := ctx.Bool("background") start := len(mp) batch := make([]string, 0, batchMax) progress := utils.NewProgress(background) - dspin := progress.AddDoubleSpinner("Warming up") + dspin := progress.AddDoubleSpinner(string(action)) + total := &vfs.CacheResponse{} for _, path := range paths { if mp == "/" { inode, err := utils.GetFileInode(path) @@ -234,18 +274,30 @@ func warmup(ctx *cli.Context) error { continue } if len(batch) >= batchMax { - sendCommand(controller, batch, threads, background, dspin) + resp := sendCommand(controller, action, batch, threads, background, dspin) + total.Add(resp) batch = batch[0:] } } if len(batch) > 0 { - sendCommand(controller, batch, threads, background, dspin) + resp := sendCommand(controller, action, batch, threads, background, dspin) + total.Add(resp) } progress.Done() + if !background { count, bytes := dspin.Current() - logger.Infof("Successfully warmed up %d files (%d bytes)", count, bytes) + switch action { + case vfs.WarmupCache: + logger.Infof("%s: %d files (%d bytes)", action, count, humanize.IBytes(uint64(bytes))) + case vfs.EvictCache: + logger.Infof("%s: %d files (%s bytes)", action, count, humanize.IBytes(uint64(bytes))) + case vfs.CheckCache: + logger.Infof("%s: %d files (%s of %s (%2.1f%%)) cached", action, count, + humanize.IBytes(uint64(bytes)-total.MissBytes), + humanize.IBytes(uint64(bytes)), + float64(uint64(bytes)-total.MissBytes)*100/float64(bytes)) + } } - return nil } diff --git a/pkg/chunk/cached_store.go b/pkg/chunk/cached_store.go index 9f933a1ba163..dacf0367c800 100644 --- a/pkg/chunk/cached_store.go +++ b/pkg/chunk/cached_store.go @@ -1033,6 +1033,30 @@ func (store *cachedStore) FillCache(id uint64, length uint32) error { return err } +func (store *cachedStore) EvictCache(id uint64, length uint32) error { + r := sliceForRead(id, int(length), store) + keys := r.keys() + for _, k := range keys { + store.bcache.remove(k) + } + return nil +} + +func (store *cachedStore) CheckCache(id uint64, length uint32) (uint64, error) { + r := sliceForRead(id, int(length), store) + keys := r.keys() + missBytes := uint64(0) + for i, k := range keys { + tmpReader, err := store.bcache.load(k) + if err == nil { + _ = tmpReader.Close() + continue + } + missBytes += uint64(r.blockSize(i)) + } + return missBytes, nil +} + func (store *cachedStore) UsedMemory() int64 { return store.bcache.usedMemory() } diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index fd86be5acb54..45d7c4ee0faa 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -39,6 +39,8 @@ type ChunkStore interface { NewWriter(id uint64) Writer Remove(id uint64, length int) error FillCache(id uint64, length uint32) error + EvictCache(id uint64, length uint32) error + CheckCache(id uint64, length uint32) (uint64, error) UsedMemory() int64 UpdateLimit(upload, download int64) } diff --git a/pkg/vfs/fill.go b/pkg/vfs/fill.go index f153cb4685cd..4623da9ffd3e 100644 --- a/pkg/vfs/fill.go +++ b/pkg/vfs/fill.go @@ -34,6 +34,113 @@ type _file struct { size uint64 } +type CacheAction uint8 + +func (act CacheAction) String() string { + switch act { + case WarmupCache: + return "warmup cache" + case EvictCache: + return "evict cache" + case CheckCache: + return "check cache" + } + return "unknown operation" +} + +const ( + WarmupCache CacheAction = iota + EvictCache + CheckCache = 2 +) + +func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concurrent int, resp *CacheResponse) { + logger.Infof("start to %s %d paths with %d workers", action, len(paths), concurrent) + + start := time.Now() + todo := make(chan _file, 10240) + wg := sync.WaitGroup{} + for i := 0; i < concurrent; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for f := range todo { + if f.ino == 0 { + continue + } + + iter := newSliceIterator(ctx, v.Meta, f.ino, f.size) + var handler sliceHandler + switch action { + case WarmupCache: + handler = func(s meta.Slice) error { + return v.Store.FillCache(s.Id, s.Size) + } + + if v.Conf.Meta.OpenCache > 0 { + if err := v.Meta.Open(ctx, f.ino, syscall.O_RDONLY, &meta.Attr{}); err != 0 { + logger.Errorf("Inode %d could be opened: %s", f.ino, err) + } + _ = v.Meta.Close(ctx, f.ino) + } + case EvictCache: + handler = func(s meta.Slice) error { + return v.Store.EvictCache(s.Id, s.Size) + } + case CheckCache: + handler = func(s meta.Slice) error { + missBytes, err := v.Store.CheckCache(s.Id, s.Size) + if err != nil { + return err + } + if resp != nil { + atomic.AddUint64(&resp.MissBytes, missBytes) + } + return nil + } + } + + // log and skip error + err := iter.Iterate(handler) + if err != nil { + logger.Error(fmt.Errorf("%s error: %w", action, err)) + } + + if resp != nil { + atomic.AddUint64(&resp.FileCount, 1) + atomic.AddUint64(&resp.SliceCount, iter.stat.count) + atomic.AddUint64(&resp.TotalBytes, iter.stat.bytes) + } + } + }() + } + + var inode Ino + var attr = &Attr{} + for _, p := range paths { + if st := v.resolve(ctx, p, &inode, attr); st != 0 { + logger.Warnf("Failed to resolve path %s: %s", p, st) + continue + } + logger.Debugf("Warming up path %s", p) + if attr.Typ == meta.TypeDirectory { + v.walkDir(ctx, inode, todo) + } else if attr.Typ == meta.TypeFile { + todo <- _file{inode, attr.Length} + } + if ctx.Canceled() { + break + } + } + close(todo) + wg.Wait() + + if ctx.Canceled() { + logger.Infof("%s cancelled", action) + } + logger.Infof("%s %d paths in %s", action, len(paths), time.Since(start)) +} + func (v *VFS) fillCache(ctx meta.Context, paths []string, concurrent int, count, bytes *uint64) { logger.Infof("start to warmup %d paths with %d workers", len(paths), concurrent) start := time.Now() @@ -184,6 +291,89 @@ func (v *VFS) walkDir(ctx meta.Context, inode Ino, todo chan _file) { } } +type sliceIterStat struct { + count uint64 + bytes uint64 +} + +type sliceIterator struct { + ctx meta.Context + mClient meta.Meta + ino Ino + size uint64 + chunkCnt uint32 + stat sliceIterStat + + err error + nextChunkIndex uint32 + nextSliceIndex uint64 + slices []meta.Slice +} + +type sliceHandler func(s meta.Slice) error + +func (iter *sliceIterator) hasNext() bool { + if iter.ctx.Canceled() { + iter.err = iter.ctx.Err() + return false + } + + for iter.nextSliceIndex >= uint64(len(iter.slices)) { + if iter.nextChunkIndex >= iter.chunkCnt { + return false + } + + iter.slices = nil + iter.nextSliceIndex = 0 + if st := iter.mClient.Read(iter.ctx, iter.ino, iter.nextChunkIndex, &iter.slices); st != 0 { + iter.err = fmt.Errorf("get slices of inode %d index %d error: %d", iter.ino, iter.nextChunkIndex, st) + return false + } + iter.nextChunkIndex++ + } + + return true +} + +func (iter *sliceIterator) next() meta.Slice { + s := iter.slices[iter.nextSliceIndex] + iter.nextSliceIndex++ + return s +} + +func (iter *sliceIterator) Iterate(handler sliceHandler) error { + for iter.hasNext() { + s := iter.next() + iter.stat.count++ + iter.stat.bytes += uint64(s.Size) + if handler == nil { + return fmt.Errorf("handler not set") + } + if err := handler(s); err != nil { + return fmt.Errorf("inode %d slice %d : %w", iter.ino, s.Id, err) + } + } + return iter.err +} + +func (iter *sliceIterator) Stat() sliceIterStat { + return iter.stat +} + +func newSliceIterator(ctx meta.Context, mClient meta.Meta, ino Ino, size uint64) *sliceIterator { + return &sliceIterator{ + ctx: ctx, + mClient: mClient, + ino: ino, + size: size, + stat: sliceIterStat{}, + + nextSliceIndex: 0, + nextChunkIndex: 0, + chunkCnt: uint32((size + meta.ChunkSize - 1) / meta.ChunkSize), + } +} + func (v *VFS) fillInode(ctx meta.Context, inode Ino, size uint64, bytes *uint64) error { var slices []meta.Slice for indx := uint64(0); indx*meta.ChunkSize < size; indx++ { diff --git a/pkg/vfs/internal.go b/pkg/vfs/internal.go index 964674659e48..cb75e7298230 100644 --- a/pkg/vfs/internal.go +++ b/pkg/vfs/internal.go @@ -283,6 +283,20 @@ type SummaryReponse struct { Tree meta.TreeSummary } +type CacheResponse struct { + FileCount uint64 + SliceCount uint64 + TotalBytes uint64 + MissBytes uint64 // for check op +} + +func (resp *CacheResponse) Add(other CacheResponse) { + resp.FileCount += other.FileCount + resp.TotalBytes += other.TotalBytes + resp.SliceCount += other.SliceCount + resp.MissBytes += other.MissBytes +} + type chunkSlice struct { ChunkIndex uint64 meta.Slice @@ -526,18 +540,35 @@ func (v *VFS) handleInternalMsg(ctx meta.Context, cmd uint32, r *utils.Buffer, o paths := strings.Split(string(r.Get(int(r.Get32()))), "\n") concurrent := r.Get16() background := r.Get8() + + action := WarmupCache + if r.HasMore() { + action = CacheAction(r.Get8()) + } + + var stat CacheResponse if background == 0 { - var count, bytes uint64 done := make(chan struct{}) go func() { - v.fillCache(ctx, paths, int(concurrent), &count, &bytes) + v.cache(ctx, action, paths, int(concurrent), &stat) close(done) }() - writeProgress(&count, &bytes, out, done) + writeProgress(&stat.FileCount, &stat.TotalBytes, out, done) } else { - go v.fillCache(meta.NewContext(ctx.Pid(), ctx.Uid(), ctx.Gids()), paths, int(concurrent), nil, nil) + go v.cache(meta.NewContext(ctx.Pid(), ctx.Uid(), ctx.Gids()), action, paths, int(concurrent), nil) + } + + data, err := json.Marshal(stat) + if err != nil { + logger.Errorf("marshal response error: %v", err) + _, _ = out.Write([]byte{byte(syscall.EIO & 0xff)}) + return } - _, _ = out.Write([]byte{0}) + w := utils.NewBuffer(uint32(1 + 4 + len(data))) + w.Put8(meta.CDATA) + w.Put32(uint32(len(data))) + w.Put(data) + _, _ = out.Write(w.Bytes()) default: logger.Warnf("unknown message type: %d", cmd) _, _ = out.Write([]byte{byte(syscall.EINVAL & 0xff)}) From 8d587f78deea302d757e4da8d1fc9c4d76a2f2b1 Mon Sep 17 00:00:00 2001 From: jiefeng Date: Fri, 19 Jan 2024 16:00:34 +0800 Subject: [PATCH 2/8] feat: cmd/warmup remove dup code Signed-off-by: jiefeng --- pkg/vfs/fill.go | 79 -------------------------------------------- pkg/vfs/fill_test.go | 4 +-- 2 files changed, 2 insertions(+), 81 deletions(-) diff --git a/pkg/vfs/fill.go b/pkg/vfs/fill.go index 4623da9ffd3e..850ae074b9d3 100644 --- a/pkg/vfs/fill.go +++ b/pkg/vfs/fill.go @@ -141,64 +141,6 @@ func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concur logger.Infof("%s %d paths in %s", action, len(paths), time.Since(start)) } -func (v *VFS) fillCache(ctx meta.Context, paths []string, concurrent int, count, bytes *uint64) { - logger.Infof("start to warmup %d paths with %d workers", len(paths), concurrent) - start := time.Now() - todo := make(chan _file, 10240) - wg := sync.WaitGroup{} - for i := 0; i < concurrent; i++ { - wg.Add(1) - go func() { - for { - f := <-todo - if f.ino == 0 { - break - } - if err := v.fillInode(ctx, f.ino, f.size, bytes); err != nil { - logger.Errorf("Inode %d could be corrupted: %s", f.ino, err) - } - if v.Conf.Meta.OpenCache > 0 { - if err := v.Meta.Open(ctx, f.ino, syscall.O_RDONLY, &meta.Attr{}); err != 0 { - logger.Errorf("Inode %d could be opened: %s", f.ino, err) - } - _ = v.Meta.Close(ctx, f.ino) - } - if count != nil { - atomic.AddUint64(count, 1) - } - if ctx.Canceled() { - break - } - } - wg.Done() - }() - } - - var inode Ino - var attr = &Attr{} - for _, p := range paths { - if st := v.resolve(ctx, p, &inode, attr); st != 0 { - logger.Warnf("Failed to resolve path %s: %s", p, st) - continue - } - logger.Debugf("Warming up path %s", p) - if attr.Typ == meta.TypeDirectory { - v.walkDir(ctx, inode, todo) - } else if attr.Typ == meta.TypeFile { - todo <- _file{inode, attr.Length} - } - if ctx.Canceled() { - break - } - } - close(todo) - wg.Wait() - if ctx.Canceled() { - logger.Infof("warmup cancelled") - } - logger.Infof("Warmup %d paths in %s", len(paths), time.Since(start)) -} - func (v *VFS) resolve(ctx meta.Context, p string, inode *Ino, attr *Attr) syscall.Errno { var inodePrefix = "inode:" if strings.HasPrefix(p, inodePrefix) { @@ -373,24 +315,3 @@ func newSliceIterator(ctx meta.Context, mClient meta.Meta, ino Ino, size uint64) chunkCnt: uint32((size + meta.ChunkSize - 1) / meta.ChunkSize), } } - -func (v *VFS) fillInode(ctx meta.Context, inode Ino, size uint64, bytes *uint64) error { - var slices []meta.Slice - for indx := uint64(0); indx*meta.ChunkSize < size; indx++ { - if st := v.Meta.Read(ctx, inode, uint32(indx), &slices); st != 0 { - return fmt.Errorf("Failed to get slices of inode %d index %d: %d", inode, indx, st) - } - for _, s := range slices { - if bytes != nil { - atomic.AddUint64(bytes, uint64(s.Size)) - } - if err := v.Store.FillCache(s.Id, s.Size); err != nil { - return fmt.Errorf("Failed to cache inode %d slice %d: %s", inode, s.Id, err) - } - if ctx.Canceled() { - return syscall.EINTR - } - } - } - return nil -} diff --git a/pkg/vfs/fill_test.go b/pkg/vfs/fill_test.go index 967f5769f9b0..db720940c501 100644 --- a/pkg/vfs/fill_test.go +++ b/pkg/vfs/fill_test.go @@ -36,7 +36,7 @@ func TestFill(t *testing.T) { _, _ = v.Symlink(ctx, "testfile", 1, "sym3") // normal cases - v.fillCache(meta.Background, []string{"/test/file", "/test", "/sym", "/"}, 2, nil, nil) + v.cache(meta.Background, WarmupCache, []string{"/test/file", "/test", "/sym", "/"}, 2, nil) // remove chunk var slices []meta.Slice @@ -45,5 +45,5 @@ func TestFill(t *testing.T) { _ = v.Store.Remove(s.Id, int(s.Size)) } // bad cases - v.fillCache(meta.Background, []string{"/test/file", "/sym2", "/sym3", "/.stats", "/not_exists"}, 2, nil, nil) + v.cache(meta.Background, WarmupCache, []string{"/test/file", "/sym2", "/sym3", "/.stats", "/not_exists"}, 2, nil) } From 9edd85fe8558170840f7f6d32a08746039dfb3c0 Mon Sep 17 00:00:00 2001 From: jiefeng Date: Fri, 19 Jan 2024 16:34:08 +0800 Subject: [PATCH 3/8] fix: test Signed-off-by: jiefeng --- cmd/warmup.go | 2 +- pkg/vfs/fill.go | 2 +- pkg/vfs/vfs_test.go | 10 ++++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cmd/warmup.go b/cmd/warmup.go index 239753a79323..52aa92351ba1 100644 --- a/cmd/warmup.go +++ b/cmd/warmup.go @@ -289,7 +289,7 @@ func warmup(ctx *cli.Context) error { count, bytes := dspin.Current() switch action { case vfs.WarmupCache: - logger.Infof("%s: %d files (%d bytes)", action, count, humanize.IBytes(uint64(bytes))) + logger.Infof("%s: %d files (%s bytes)", action, count, humanize.IBytes(uint64(bytes))) case vfs.EvictCache: logger.Infof("%s: %d files (%s bytes)", action, count, humanize.IBytes(uint64(bytes))) case vfs.CheckCache: diff --git a/pkg/vfs/fill.go b/pkg/vfs/fill.go index 850ae074b9d3..f480ff99838e 100644 --- a/pkg/vfs/fill.go +++ b/pkg/vfs/fill.go @@ -58,7 +58,7 @@ func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concur logger.Infof("start to %s %d paths with %d workers", action, len(paths), concurrent) start := time.Now() - todo := make(chan _file, 10240) + todo := make(chan _file, 2*concurrent) wg := sync.WaitGroup{} for i := 0; i < concurrent; i++ { wg.Add(1) diff --git a/pkg/vfs/vfs_test.go b/pkg/vfs/vfs_test.go index a35ae77381dd..aa3d9e169c25 100644 --- a/pkg/vfs/vfs_test.go +++ b/pkg/vfs/vfs_test.go @@ -837,11 +837,13 @@ func TestInternalFile(t *testing.T) { } off += uint64(len(buf)) resp = make([]byte, 1024*10) - if n, e = readControl(resp, &off); e != 0 || n != 1 { - t.Fatalf("read result: %s %d", e, n) - } else if resp[0] != 0 { - t.Fatalf("fill result: %s", string(buf[:n])) + + data, _ = json.Marshal(CacheResponse{}) + expectSize := 1 + 4 + len(data) + if n, e = readControl(resp, &off); e != 0 || n != expectSize { + t.Fatalf("read result: %s %d %d", e, n, expectSize) } + off += uint64(n) // invalid msg From a8b6e8d360e19f2e6c445bef51851b4bd56aa715 Mon Sep 17 00:00:00 2001 From: jiefeng Date: Fri, 19 Jan 2024 17:20:02 +0800 Subject: [PATCH 4/8] fix: send file nonblocking, in case workers exit first Signed-off-by: jiefeng --- pkg/vfs/fill.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/vfs/fill.go b/pkg/vfs/fill.go index f480ff99838e..c13ec8b1a120 100644 --- a/pkg/vfs/fill.go +++ b/pkg/vfs/fill.go @@ -58,13 +58,17 @@ func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concur logger.Infof("start to %s %d paths with %d workers", action, len(paths), concurrent) start := time.Now() - todo := make(chan _file, 2*concurrent) + todo := make(chan _file, 10*concurrent) wg := sync.WaitGroup{} for i := 0; i < concurrent; i++ { wg.Add(1) go func() { defer wg.Done() for f := range todo { + if ctx.Canceled() { + return + } + if f.ino == 0 { continue } @@ -126,7 +130,7 @@ func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concur if attr.Typ == meta.TypeDirectory { v.walkDir(ctx, inode, todo) } else if attr.Typ == meta.TypeFile { - todo <- _file{inode, attr.Length} + _ = sendFile(ctx, todo, _file{inode, attr.Length}) } if ctx.Canceled() { break @@ -141,6 +145,15 @@ func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concur logger.Infof("%s %d paths in %s", action, len(paths), time.Since(start)) } +func sendFile(ctx meta.Context, todo chan _file, f _file) error { + select { + case todo <- f: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + func (v *VFS) resolve(ctx meta.Context, p string, inode *Ino, attr *Attr) syscall.Errno { var inodePrefix = "inode:" if strings.HasPrefix(p, inodePrefix) { @@ -221,7 +234,7 @@ func (v *VFS) walkDir(ctx meta.Context, inode Ino, todo chan _file) { if f.Attr.Typ == meta.TypeDirectory { pending = append(pending, f.Inode) } else if f.Attr.Typ != meta.TypeSymlink { - todo <- _file{f.Inode, f.Attr.Length} + _ = sendFile(ctx, todo, _file{f.Inode, f.Attr.Length}) } if ctx.Canceled() { return From f902af8272d6dc250fffafee18b1c7d6e8424413 Mon Sep 17 00:00:00 2001 From: jiefeng Date: Mon, 22 Jan 2024 14:54:37 +0800 Subject: [PATCH 5/8] fix: cmd input Signed-off-by: jiefeng --- cmd/warmup.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/warmup.go b/cmd/warmup.go index 52aa92351ba1..a9f28f276bd6 100644 --- a/cmd/warmup.go +++ b/cmd/warmup.go @@ -145,13 +145,17 @@ func sendCommand(cf *os.File, action vfs.CacheAction, batch []string, threads ui if background { back = 1 } - wb := utils.NewBuffer(8 + 4 + 3 + uint32(len(paths))) + headerLen, bodyLen := uint32(8), uint32(4+len(paths)+2+1+1) + wb := utils.NewBuffer(headerLen + bodyLen) wb.Put32(meta.FillCache) - wb.Put32(4 + 3 + uint32(len(paths))) + wb.Put32(bodyLen) + wb.Put32(uint32(len(paths))) wb.Put([]byte(paths)) wb.Put16(uint16(threads)) wb.Put8(back) + wb.Put8(uint8(action)) + if _, err := cf.Write(wb.Bytes()); err != nil { logger.Fatalf("Write message: %s", err) } From f00f0009990985e424b4580edeaff9b69ca6baf7 Mon Sep 17 00:00:00 2001 From: jiefeng Date: Mon, 22 Jan 2024 15:22:29 +0800 Subject: [PATCH 6/8] test: add test for evict and check cache Signed-off-by: jiefeng --- pkg/chunk/cached_store_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pkg/chunk/cached_store_test.go b/pkg/chunk/cached_store_test.go index 2e2dbcf79874..154d5fc3602b 100644 --- a/pkg/chunk/cached_store_test.go +++ b/pkg/chunk/cached_store_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/juicedata/juicefs/pkg/object" + "github.com/stretchr/testify/assert" ) func forgetSlice(store ChunkStore, sliceId uint64, size int) error { @@ -246,6 +247,29 @@ func TestFillCache(t *testing.T) { if cnt, used := bcache.stats(); cnt != 2 || used != expect { t.Fatalf("cache cnt %d used %d, expect cnt 2 used %d", cnt, used, expect) } + + // check + missBytes, err := store.CheckCache(10, 1024) + assert.Nil(t, err) + assert.Equal(t, uint64(0), missBytes) + + missBytes, err = store.CheckCache(11, uint32(bsize)) + assert.Nil(t, err) + assert.Equal(t, uint64(0), missBytes) + + // evict slice 11 + err = store.EvictCache(11, uint32(bsize)) + assert.Nil(t, err) + + // stat + if cnt, used := bcache.stats(); cnt != 1 || used != 1024+4096 { // only chunk 10 cached + t.Fatalf("cache cnt %d used %d, expect cnt 1 used 5120", cnt, used) + } + + // check again + missBytes, err = store.CheckCache(11, uint32(bsize)) + assert.Nil(t, err) + assert.Equal(t, uint64(bsize), missBytes) } func BenchmarkCachedRead(b *testing.B) { From 28e42a2d19b3ab73f197938923a0a5590b3a6a12 Mon Sep 17 00:00:00 2001 From: jiefeng Date: Mon, 22 Jan 2024 21:07:52 +0800 Subject: [PATCH 7/8] refactor: based on review feedback Signed-off-by: jiefeng --- cmd/warmup.go | 2 +- pkg/vfs/fill.go | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cmd/warmup.go b/cmd/warmup.go index a9f28f276bd6..644791519db2 100644 --- a/cmd/warmup.go +++ b/cmd/warmup.go @@ -261,7 +261,7 @@ func warmup(ctx *cli.Context) error { start := len(mp) batch := make([]string, 0, batchMax) progress := utils.NewProgress(background) - dspin := progress.AddDoubleSpinner(string(action)) + dspin := progress.AddDoubleSpinner(action.String()) total := &vfs.CacheResponse{} for _, path := range paths { if mp == "/" { diff --git a/pkg/vfs/fill.go b/pkg/vfs/fill.go index c13ec8b1a120..548c2957c3bc 100644 --- a/pkg/vfs/fill.go +++ b/pkg/vfs/fill.go @@ -70,7 +70,8 @@ func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concur } if f.ino == 0 { - continue + ctx.Cancel() + return } iter := newSliceIterator(ctx, v.Meta, f.ino, f.size) @@ -107,7 +108,7 @@ func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concur // log and skip error err := iter.Iterate(handler) if err != nil { - logger.Error(fmt.Errorf("%s error: %w", action, err)) + logger.Errorf("%s error : %s", action, err) } if resp != nil { @@ -297,13 +298,13 @@ func (iter *sliceIterator) next() meta.Slice { } func (iter *sliceIterator) Iterate(handler sliceHandler) error { + if handler == nil { + return fmt.Errorf("handler not set") + } for iter.hasNext() { s := iter.next() iter.stat.count++ iter.stat.bytes += uint64(s.Size) - if handler == nil { - return fmt.Errorf("handler not set") - } if err := handler(s); err != nil { return fmt.Errorf("inode %d slice %d : %w", iter.ino, s.Id, err) } From 02dad7ed22b5434825860c938ccbd9688edeaf2d Mon Sep 17 00:00:00 2001 From: jiefeng Date: Tue, 23 Jan 2024 10:36:17 +0800 Subject: [PATCH 8/8] refactor: based on review feedback Signed-off-by: jiefeng --- pkg/vfs/fill.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/vfs/fill.go b/pkg/vfs/fill.go index 548c2957c3bc..fe4e0fe5f084 100644 --- a/pkg/vfs/fill.go +++ b/pkg/vfs/fill.go @@ -70,8 +70,8 @@ func (v *VFS) cache(ctx meta.Context, action CacheAction, paths []string, concur } if f.ino == 0 { - ctx.Cancel() - return + logger.Warnf("%s got inode 0", action) + continue } iter := newSliceIterator(ctx, v.Meta, f.ino, f.size) @@ -256,7 +256,6 @@ type sliceIterator struct { ctx meta.Context mClient meta.Meta ino Ino - size uint64 chunkCnt uint32 stat sliceIterStat @@ -321,7 +320,6 @@ func newSliceIterator(ctx meta.Context, mClient meta.Meta, ino Ino, size uint64) ctx: ctx, mClient: mClient, ino: ino, - size: size, stat: sliceIterStat{}, nextSliceIndex: 0,