Skip to content

Commit

Permalink
storage: improve concurrency on in-memory storage (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Nov 10, 2021
1 parent db850c3 commit 163262f
Showing 1 changed file with 48 additions and 17 deletions.
65 changes: 48 additions & 17 deletions pkg/storage/filepath/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ func (fs *MemoryFS) EnsureDir(dirname string) error {

// Write a copy of the object to our in-memory filesystem.
func (fs *MemoryFS) Write(encoder runtime.Encoder, p string, obj runtime.Object) error {
// Encoding the object as bytes ensures that our in-memory filesystem
// has the same immutability semantics as a real storage system.
buf := new(bytes.Buffer)
if err := encoder.Encode(obj, buf); err != nil {
return err
}

fs.mu.Lock()
defer fs.mu.Unlock()

Expand All @@ -175,24 +182,22 @@ func (fs *MemoryFS) Write(encoder runtime.Encoder, p string, obj runtime.Object)
return err
}

// Encoding the object as bytes ensures that our in-memory filesystem
// has the same immutability semantics as a real storage system.
buf := new(bytes.Buffer)
if err := encoder.Encode(obj, buf); err != nil {
return err
}
dir[filepath.Base(p)] = buf
return nil
}

// Read a copy of the object from our in-memory filesystem.
func (fs *MemoryFS) Read(decoder runtime.Decoder, p string, newFunc func() runtime.Object) (runtime.Object, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
return fs.readInternal(decoder, p, newFunc)
buf, err := fs.readBuffer(p)
fs.mu.Unlock()
if err != nil {
return nil, err
}
return fs.decodeBuffer(decoder, buf, newFunc)
}

func (fs *MemoryFS) readInternal(decoder runtime.Decoder, p string, newFunc func() runtime.Object) (runtime.Object, error) {
func (fs *MemoryFS) readBuffer(p string) (*bytes.Buffer, error) {
dir, err := fs.ensureDir(filepath.Dir(p))
if err != nil {
return nil, err
Expand All @@ -206,7 +211,10 @@ func (fs *MemoryFS) readInternal(decoder runtime.Decoder, p string, newFunc func
if !ok {
return nil, os.ErrNotExist
}
return buf, nil
}

func (fs *MemoryFS) decodeBuffer(decoder runtime.Decoder, buf *bytes.Buffer, newFunc func() runtime.Object) (runtime.Object, error) {
newObj := newFunc()
decodedObj, _, err := decoder.Decode(buf.Bytes(), nil, newObj)
if err != nil {
Expand All @@ -218,13 +226,37 @@ func (fs *MemoryFS) readInternal(decoder runtime.Decoder, p string, newFunc func
// Walk the directory, reading all objects in it.
func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error {
fs.mu.Lock()
defer fs.mu.Unlock()
keyPaths, buffers, err := fs.readDir(dirname)
fs.mu.Unlock()
if err != nil {
return err
}

// Do decoding and visitation outside the lock.
for i, keyPath := range keyPaths {
buf := buffers[i]
obj, err := fs.decodeBuffer(codec, buf, newFunc)
if err != nil {
return err
}
err = visitFunc(keyPath, obj)
if err != nil {
return err
}
}
return nil
}

// Internal helper for reading the directory. Must hold the mutex.
func (fs *MemoryFS) readDir(dirname string) ([]string, []*bytes.Buffer, error) {
dir, err := fs.ensureDir(dirname)
if err != nil {
return err
return nil, nil, err
}

keyPaths := []string{}
buffers := []*bytes.Buffer{}

var walk func(ancestorPath string, dir map[string]interface{}) error
walk = func(ancestorPath string, dir map[string]interface{}) error {
for key, val := range dir {
Expand All @@ -242,17 +274,16 @@ func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, code
continue
}

newObj, err := fs.readInternal(codec, keyPath, newFunc)
if err != nil {
return err
}
err = visitFunc(keyPath, newObj)
newBuf, err := fs.readBuffer(keyPath)
if err != nil {
return err
}
keyPaths = append(keyPaths, keyPath)
buffers = append(buffers, newBuf)
}
return nil
}

return walk(dirname, dir)
err = walk(dirname, dir)
return keyPaths, buffers, err
}

0 comments on commit 163262f

Please sign in to comment.