Skip to content

Commit

Permalink
Add logic in EOS FS for maintaining same inode across file versions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored Sep 30, 2020
1 parent 1df6932 commit a80c395
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 50 deletions.
8 changes: 8 additions & 0 deletions changelog/unreleased/eos-version-invariant.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Enhancement: Add logic in EOS FS for maintaining same inode across file versions

This PR adds the functionality to maintain the same inode across various
versions of a file by returning the inode of the version folder which remains
constant. It requires extra metadata operations so a flag is provided to disable
it.

https://github.com/cs3org/reva/pull/1174.
10 changes: 9 additions & 1 deletion docs/content/en/docs/config/packages/storage/fs/eos/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,16 @@ use_keytab = false
{{< /highlight >}}
{{% /dir %}}

{{% dir name="version_invariant" type="bool" default=true %}}
Whether to maintain the same inode across various versions of a file. Requires extra metadata operations if set to true [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eos/eos.go#L95)
{{< highlight toml >}}
[storage.fs.eos]
version_invariant = true
{{< /highlight >}}
{{% /dir %}}

{{% dir name="gatewaysvc" type="string" default="0.0.0.0:19000" %}}
GatewaySvc stores the endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eos/eos.go#L94)
GatewaySvc stores the endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eos/eos.go#L98)
{{< highlight toml >}}
[storage.fs.eos]
gatewaysvc = "0.0.0.0:19000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,16 @@ use_keytab = false
{{< /highlight >}}
{{% /dir %}}

{{% dir name="version_invariant" type="bool" default=true %}}
Whether to maintain the same inode across various versions of a file. Requires extra metadata operations if set to true [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eoshome/eoshome.go#L101)
{{< highlight toml >}}
[storage.fs.eoshome]
version_invariant = true
{{< /highlight >}}
{{% /dir %}}

{{% dir name="gatewaysvc" type="string" default="0.0.0.0:19000" %}}
GatewaySvc stores the endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eoshome/eoshome.go#L100)
GatewaySvc stores the endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eoshome/eoshome.go#L104)
{{< highlight toml >}}
[storage.fs.eoshome]
gatewaysvc = "0.0.0.0:19000"
Expand Down
72 changes: 43 additions & 29 deletions internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package ocdav
import (
"net/http"
"path"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -182,44 +183,57 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) {
// TODO check this really streams
if r.Header.Get("Content-Type") == "application/offset+octet-stream" {

httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
rhttp.Timeout(time.Duration(s.c.Timeout*int64(time.Second))),
rhttp.Insecure(s.c.Insecure),
)
httpReq, err := rhttp.NewRequest(ctx, "PATCH", uRes.UploadEndpoint, r.Body)
length, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
if err != nil {
log.Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
w.WriteHeader(http.StatusBadRequest)
return
}

httpReq.Header.Set("Content-Type", r.Header.Get("Content-Type"))
httpReq.Header.Set("Content-Length", r.Header.Get("Content-Length"))
if r.Header.Get("Upload-Offset") != "" {
httpReq.Header.Set("Upload-Offset", r.Header.Get("Upload-Offset"))
} else {
httpReq.Header.Set("Upload-Offset", "0")
}
httpReq.Header.Set("Tus-Resumable", r.Header.Get("Tus-Resumable"))
var httpRes *http.Response

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing GET request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()
if length != 0 {
httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
rhttp.Timeout(time.Duration(s.c.Timeout*int64(time.Second))),
rhttp.Insecure(s.c.Insecure),
)
httpReq, err := rhttp.NewRequest(ctx, "PATCH", uRes.UploadEndpoint, r.Body)
if err != nil {
log.Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Set("Upload-Offset", httpRes.Header.Get("Upload-Offset"))
w.Header().Set("Tus-Resumable", httpRes.Header.Get("Tus-Resumable"))
if httpRes.StatusCode != http.StatusNoContent {
w.WriteHeader(httpRes.StatusCode)
return
httpReq.Header.Set("Content-Type", r.Header.Get("Content-Type"))
httpReq.Header.Set("Content-Length", r.Header.Get("Content-Length"))
if r.Header.Get("Upload-Offset") != "" {
httpReq.Header.Set("Upload-Offset", r.Header.Get("Upload-Offset"))
} else {
httpReq.Header.Set("Upload-Offset", "0")
}
httpReq.Header.Set("Tus-Resumable", r.Header.Get("Tus-Resumable"))

httpRes, err = httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing GET request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()

w.Header().Set("Upload-Offset", httpRes.Header.Get("Upload-Offset"))
w.Header().Set("Tus-Resumable", httpRes.Header.Get("Tus-Resumable"))
if httpRes.StatusCode != http.StatusNoContent {
w.WriteHeader(httpRes.StatusCode)
return
}
} else {
log.Info().Msg("Skipping sending a Patch request as body is empty")
}

// check if upload was fully completed
if httpRes.Header.Get("Upload-Offset") == r.Header.Get("Upload-Length") {
if length == 0 || httpRes.Header.Get("Upload-Offset") == r.Header.Get("Upload-Length") {
// get uploaded file metadata
sRes, err := client.Stat(ctx, sReq)
if err != nil {
Expand All @@ -246,7 +260,7 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) {
w.WriteHeader(http.StatusInternalServerError)
return
}
if httpRes.Header.Get("X-OC-Mtime") != "" {
if httpRes != nil && httpRes.Header != nil && httpRes.Header.Get("X-OC-Mtime") != "" {
// set the "accepted" value if returned in the upload response headers
w.Header().Set("X-OC-Mtime", httpRes.Header.Get("X-OC-Mtime"))
}
Expand Down
76 changes: 70 additions & 6 deletions pkg/eosclient/eosclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type Options struct {
// UseKeyTabAuth changes will authenticate requests by using an EOS keytab.
UseKeytab bool

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool

// SingleUsername is the username to use when connecting to EOS.
// Defaults to apache
SingleUsername string
Expand Down Expand Up @@ -401,7 +405,20 @@ func (c *Client) GetFileInfoByInode(ctx context.Context, uid, gid string, inode
if err != nil {
return nil, err
}
return c.parseFileInfo(stdout)
info, err := c.parseFileInfo(stdout)
if err != nil {
return nil, err
}

if c.opt.VersionInvariant && isVersionFolder(info.File) {
info, err = c.getFileInfoFromVersion(ctx, uid, gid, info.File)
if err != nil {
return nil, err
}
info.Inode = inode
}

return info, nil
}

// GetFileInfoByFXID returns the FileInfo by the given file id in hexadecimal
Expand Down Expand Up @@ -453,7 +470,20 @@ func (c *Client) GetFileInfoByPath(ctx context.Context, uid, gid, path string) (
if err != nil {
return nil, err
}
return c.parseFileInfo(stdout)
info, err := c.parseFileInfo(stdout)
if err != nil {
return nil, err
}

if c.opt.VersionInvariant && !isVersionFolder(path) && !info.IsDir {
inode, err := c.getVersionFolderInode(ctx, uid, gid, path)
if err != nil {
return nil, err
}
info.Inode = inode
}

return info, nil
}

// GetQuota gets the quota of a user on the quota node defined by path
Expand Down Expand Up @@ -586,8 +616,7 @@ func (c *Client) PurgeDeletedEntries(ctx context.Context, uid, gid string) error

// ListVersions list all the versions for a given file.
func (c *Client) ListVersions(ctx context.Context, uid, gid, p string) ([]*FileInfo, error) {
basename := path.Base(p)
versionFolder := path.Join(path.Dir(p), versionPrefix+basename)
versionFolder := getVersionFolder(p)
finfos, err := c.List(ctx, uid, gid, versionFolder)
if err != nil {
// we send back an empty list
Expand All @@ -605,11 +634,46 @@ func (c *Client) RollbackToVersion(ctx context.Context, uid, gid, path, version

// ReadVersion reads the version for the given file.
func (c *Client) ReadVersion(ctx context.Context, uid, gid, p, version string) (io.ReadCloser, error) {
basename := path.Base(p)
versionFile := path.Join(path.Dir(p), versionPrefix+basename, version)
versionFile := path.Join(getVersionFolder(p), version)
return c.Read(ctx, uid, gid, versionFile)
}

func (c *Client) getVersionFolderInode(ctx context.Context, uid, gid, p string) (uint64, error) {
versionFolder := getVersionFolder(p)
md, err := c.GetFileInfoByPath(ctx, uid, gid, versionFolder)
if err != nil {
if err = c.CreateDir(ctx, uid, gid, versionFolder); err != nil {
return 0, err
}
md, err = c.GetFileInfoByPath(ctx, uid, gid, versionFolder)
if err != nil {
return 0, err
}
}
return md.Inode, nil
}

func (c *Client) getFileInfoFromVersion(ctx context.Context, uid, gid, p string) (*FileInfo, error) {
file := getFileFromVersionFolder(p)
md, err := c.GetFileInfoByPath(ctx, uid, gid, file)
if err != nil {
return nil, err
}
return md, nil
}

func isVersionFolder(p string) bool {
return strings.HasPrefix(path.Base(p), versionPrefix)
}

func getVersionFolder(p string) string {
return path.Join(path.Dir(p), versionPrefix+path.Base(p))
}

func getFileFromVersionFolder(p string) string {
return path.Join(path.Dir(p), strings.TrimPrefix(path.Base(p), versionPrefix))
}

func parseRecycleList(raw string) ([]*DeletedEntry, error) {
entries := []*DeletedEntry{}
rawLines := strings.Split(raw, "\n")
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/fs/eos/eos.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ type config struct {
// UseKeyTabAuth changes will authenticate requests by using an EOS keytab.
UseKeytab bool `mapstructure:"use_keytab" docs:"false"`

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool `mapstructure:"version_invariant" docs:"true"`

// GatewaySvc stores the endpoint at which the GRPC gateway is exposed.
GatewaySvc string `mapstructure:"gatewaysvc" docs:"0.0.0.0:19000"`
}
Expand All @@ -100,6 +104,12 @@ func parseConfig(m map[string]interface{}) (*config, error) {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}

// default to version invariance if not configured
if _, ok := m["version_invariant"]; !ok {
c.VersionInvariant = true
}

return c, nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/fs/eoshome/eoshome.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type config struct {
// UseKeyTabAuth changes will authenticate requests by using an EOS keytab.
UseKeytab bool `mapstructure:"use_keytab" docs:"false"`

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool `mapstructure:"version_invariant" docs:"true"`

// GatewaySvc stores the endpoint at which the GRPC gateway is exposed.
GatewaySvc string `mapstructure:"gatewaysvc" docs:"0.0.0.0:19000"`
}
Expand All @@ -106,6 +110,12 @@ func parseConfig(m map[string]interface{}) (*config, error) {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}

// default to version invariance if not configured
if _, ok := m["version_invariant"]; !ok {
c.VersionInvariant = true
}

return c, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ type Config struct {
// EnableHome enables the creation of home directories.
EnableHome bool `mapstructure:"enable_home"`

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool `mapstructure:"version_invariant"`

// GatewaySvc stores the endpoint at which the GRPC gateway is exposed.
GatewaySvc string `mapstructure:"gatewaysvc"`
}
Expand Down Expand Up @@ -198,6 +202,7 @@ func NewEOSFS(c *Config) (storage.FS, error) {
UseKeytab: c.UseKeytab,
Keytab: c.Keytab,
SecProtocol: c.SecProtocol,
VersionInvariant: c.VersionInvariant,
}

eosClient := eosclient.New(eosClientOpts)
Expand Down
23 changes: 10 additions & 13 deletions pkg/storage/utils/eosfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,18 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {

// only delete the upload if it was successfully written to eos
if err == nil {
// cleanup in the background, delete might take a while and we don't need to wait for it to finish
go func() {
if err := os.Remove(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload info")
}
if err := os.Remove(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload info")
}
if err := os.Remove(upload.binPath); err != nil {
if !os.IsNotExist(err) {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload binary")
}
}
if err := os.Remove(upload.binPath); err != nil {
if !os.IsNotExist(err) {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload binary")
}
}()
}
}

// TODO: set mtime if specified in metadata
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/utils/localfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (fs *localfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tu
binPath: binPath,
infoPath: binPath + ".info",
fs: fs,
ctx: ctx,
}

if !info.SizeIsDeferred && info.Size == 0 {
Expand Down Expand Up @@ -316,6 +317,9 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
}

err := os.Rename(upload.binPath, np)
if err != nil {
return err
}

// only delete the upload if it was successfully written to the fs
if err := os.Remove(upload.infoPath); err != nil {
Expand Down

0 comments on commit a80c395

Please sign in to comment.