Skip to content

Commit

Permalink
Ceph driver fixes (#4200)
Browse files Browse the repository at this point in the history
* Do not cd to user home if user homes are disabled

* Simplify uploads

* Stat shadow folders before creating them

* Clean up chunked upload

* Fix panic on shutdown

* Changelog

* Check type before cache eviction

Co-authored-by: Gianmaria Del Monte <g.macmount@gmail.com>

* Continue on stat permission denied

---------

Co-authored-by: Gianmaria Del Monte <g.macmount@gmail.com>
  • Loading branch information
javfg and gmgigi96 authored Oct 9, 2023
1 parent 6161ac3 commit d36bcd5
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 455 deletions.
9 changes: 9 additions & 0 deletions changelog/unreleased/fix-ceph-driver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Enhancement: Multiple fixes for Ceph driver

* Avoid usage/creation of user homes when they are disabled in the config
* Simplify the regular uploads (not chunked)
* Avoid creation of shadow folders at the root if they are already there
* Clean up the chunked upload
* Fix panic on shutdown

https://github.com/cs3org/reva/pull/4200
11 changes: 7 additions & 4 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func init() {
registry.Register("cephfs", New)
}

// New returns an implementation to of the storage.FS interface that talk to
// New returns an implementation to of the storage.FS interface that talks to
// a ceph filesystem.
func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err error) {
var o Options
Expand All @@ -81,9 +81,12 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro
}

for _, dir := range []string{o.ShadowFolder, o.UploadFolder} {
err = adminConn.adminMount.MakeDir(dir, dirPermFull)
if err != nil && err.Error() != errFileExists {
return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error())
_, err := adminConn.adminMount.Statx(dir, cephfs2.StatxMask(cephfs2.StatxIno), 0)
if err != nil {
err = adminConn.adminMount.MakeDir(dir, dirPermFull)
if err != nil && err.Error() != errFileExists {
return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error())
}
}
}

Expand Down
122 changes: 1 addition & 121 deletions pkg/storage/fs/cephfs/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (path string, err er
}

func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chunk string, err error) {
var chunkInfo *ChunkBLOBInfo

chunkInfo, err = GetChunkBLOBInfo(path)
chunkInfo, err := GetChunkBLOBInfo(path)
if err != nil {
err = fmt.Errorf("error getting chunk info from path: %s", path)
return
Expand Down Expand Up @@ -223,122 +221,4 @@ func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, e
}

return chunkInfo.Path, chunk, nil

// TODO(labkode): implement old chunking

/*
req2 := &provider.StartWriteSessionRequest{}
res2, err := client.StartWriteSession(ctx, req2)
if err != nil {
logger.Error(ctx, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if res2.Status.Code != rpc.Code_CODE_OK {
logger.Println(ctx, res2.Status)
w.WriteHeader(http.StatusInternalServerError)
return
}
sessID := res2.SessionId
logger.Build().Str("sessID", sessID).Msg(ctx, "got write session id")
stream, err := client.Write(ctx)
if err != nil {
logger.Error(ctx, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
buffer := make([]byte, 1024*1024*3)
var offset uint64
var numChunks uint64
for {
n, err := fd.Read(buffer)
if n > 0 {
req := &provider.WriteRequest{Data: buffer, Length: uint64(n), SessionId: sessID, Offset: offset}
err = stream.Send(req)
if err != nil {
logger.Error(ctx, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
numChunks++
offset += uint64(n)
}
if err == io.EOF {
break
}
if err != nil {
logger.Error(ctx, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
res3, err := stream.CloseAndRecv()
if err != nil {
logger.Error(ctx, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if res3.Status.Code != rpc.Code_CODE_OK {
logger.Println(ctx, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
req4 := &provider.FinishWriteSessionRequest{Filename: chunkInfo.path, SessionId: sessID}
res4, err := client.FinishWriteSession(ctx, req4)
if err != nil {
logger.Error(ctx, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if res4.Status.Code != rpc.Code_CODE_OK {
logger.Println(ctx, res4.Status)
w.WriteHeader(http.StatusInternalServerError)
return
}
req.Filename = chunkInfo.path
res, err = client.Stat(ctx, req)
if err != nil {
logger.Error(ctx, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if res.Status.Code != rpc.Code_CODE_OK {
logger.Println(ctx, res.Status)
w.WriteHeader(http.StatusInternalServerError)
return
}
md2 := res.Metadata
w.Header().Add("Content-Type", md2.Mime)
w.Header().Set("ETag", md2.Etag)
w.Header().Set("OC-FileId", md2.Id)
w.Header().Set("OC-ETag", md2.Etag)
t := time.Unix(int64(md2.Mtime), 0)
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.Header().Set("X-OC-MTime", "accepted")
if md == nil {
w.WriteHeader(http.StatusCreated)
return
}
w.WriteHeader(http.StatusNoContent)
return
*/
}
11 changes: 6 additions & 5 deletions pkg/storage/fs/cephfs/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ func newCache() (c *connections, err error) {
MaxCost: usrLimit,
BufferItems: 64,
OnEvict: func(item *ristretto.Item) {
v := item.Value.(cacheVal)
v.perm.Destroy()
_ = v.mount.Unmount()
_ = v.mount.Release()
if v, ok := item.Value.(*cacheVal); ok {
v.perm.Destroy()
_ = v.mount.Unmount()
_ = v.mount.Release()
}
},
})
if err != nil {
Expand Down Expand Up @@ -212,7 +213,7 @@ func newConn(user *User) *cacheVal {
return destroyCephConn(mount, perm)
}

if user != nil {
if user != nil && !user.fs.conf.DisableHome {
if err = mount.ChangeDir(user.fs.conf.Root); err != nil {
return destroyCephConn(mount, perm)
}
Expand Down
Loading

0 comments on commit d36bcd5

Please sign in to comment.