Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic in EOS FS for maintaining same inode across file versions #1174

Merged
merged 5 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog/unreleased/eos-version-invariant.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Enhancement: Add logic in EOS FS for maintaining same inode across file versions

This PR adds the functionality to maintain the same inode across various
versions of a file by returning the inode of the version folder which remains
constant. It requires extra metadata operations so a flag is provided to disable
it.

https://github.com/cs3org/reva/pull/1174.
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,16 @@ use_keytab = false
{{< /highlight >}}
{{% /dir %}}

{{% dir name="version_invariant" type="bool" default=true %}}
Whether to maintain the same inode across various versions of a file. Requires extra metadata operations if set to true [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eos/eos.go#L95)
{{< highlight toml >}}
[storage.fs.eos]
version_invariant = true
{{< /highlight >}}
{{% /dir %}}

{{% dir name="gatewaysvc" type="string" default="0.0.0.0:19000" %}}
GatewaySvc stores the endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eos/eos.go#L94)
GatewaySvc stores the endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eos/eos.go#L98)
{{< highlight toml >}}
[storage.fs.eos]
gatewaysvc = "0.0.0.0:19000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,16 @@ use_keytab = false
{{< /highlight >}}
{{% /dir %}}

{{% dir name="version_invariant" type="bool" default=true %}}
Whether to maintain the same inode across various versions of a file. Requires extra metadata operations if set to true [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eoshome/eoshome.go#L101)
{{< highlight toml >}}
[storage.fs.eoshome]
version_invariant = true
{{< /highlight >}}
{{% /dir %}}

{{% dir name="gatewaysvc" type="string" default="0.0.0.0:19000" %}}
GatewaySvc stores the endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eoshome/eoshome.go#L100)
GatewaySvc stores the endpoint at which the GRPC gateway is exposed. [[Ref]](https://github.com/cs3org/reva/tree/master/pkg/storage/fs/eoshome/eoshome.go#L104)
{{< highlight toml >}}
[storage.fs.eoshome]
gatewaysvc = "0.0.0.0:19000"
Expand Down
72 changes: 43 additions & 29 deletions internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package ocdav
import (
"net/http"
"path"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -167,44 +168,57 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) {
// TODO check this really streams
if r.Header.Get("Content-Type") == "application/offset+octet-stream" {

httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
rhttp.Timeout(time.Duration(s.c.Timeout*int64(time.Second))),
rhttp.Insecure(s.c.Insecure),
)
httpReq, err := rhttp.NewRequest(ctx, "PATCH", uRes.UploadEndpoint, r.Body)
length, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
if err != nil {
log.Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
w.WriteHeader(http.StatusBadRequest)
return
}

httpReq.Header.Set("Content-Type", r.Header.Get("Content-Type"))
httpReq.Header.Set("Content-Length", r.Header.Get("Content-Length"))
if r.Header.Get("Upload-Offset") != "" {
httpReq.Header.Set("Upload-Offset", r.Header.Get("Upload-Offset"))
} else {
httpReq.Header.Set("Upload-Offset", "0")
}
httpReq.Header.Set("Tus-Resumable", r.Header.Get("Tus-Resumable"))
var httpRes *http.Response

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing GET request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()
if length != 0 {
httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
rhttp.Timeout(time.Duration(s.c.Timeout*int64(time.Second))),
rhttp.Insecure(s.c.Insecure),
)
httpReq, err := rhttp.NewRequest(ctx, "PATCH", uRes.UploadEndpoint, r.Body)
if err != nil {
log.Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Set("Upload-Offset", httpRes.Header.Get("Upload-Offset"))
w.Header().Set("Tus-Resumable", httpRes.Header.Get("Tus-Resumable"))
if httpRes.StatusCode != http.StatusNoContent {
w.WriteHeader(httpRes.StatusCode)
return
httpReq.Header.Set("Content-Type", r.Header.Get("Content-Type"))
httpReq.Header.Set("Content-Length", r.Header.Get("Content-Length"))
if r.Header.Get("Upload-Offset") != "" {
httpReq.Header.Set("Upload-Offset", r.Header.Get("Upload-Offset"))
} else {
httpReq.Header.Set("Upload-Offset", "0")
}
httpReq.Header.Set("Tus-Resumable", r.Header.Get("Tus-Resumable"))

httpRes, err = httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing GET request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()

w.Header().Set("Upload-Offset", httpRes.Header.Get("Upload-Offset"))
w.Header().Set("Tus-Resumable", httpRes.Header.Get("Tus-Resumable"))
if httpRes.StatusCode != http.StatusNoContent {
w.WriteHeader(httpRes.StatusCode)
return
}
} else {
log.Info().Msg("Skipping sending a Patch request as body is empty")
}

// check if upload was fully completed
if httpRes.Header.Get("Upload-Offset") == r.Header.Get("Upload-Length") {
if length == 0 || httpRes.Header.Get("Upload-Offset") == r.Header.Get("Upload-Length") {
// get uploaded file metadata
sRes, err := client.Stat(ctx, sReq)
if err != nil {
Expand All @@ -226,7 +240,7 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) {
w.WriteHeader(http.StatusInternalServerError)
return
}
if httpRes.Header.Get("X-OC-Mtime") != "" {
if httpRes != nil && httpRes.Header != nil && httpRes.Header.Get("X-OC-Mtime") != "" {
// set the "accepted" value if returned in the upload response headers
w.Header().Set("X-OC-Mtime", httpRes.Header.Get("X-OC-Mtime"))
}
Expand Down
76 changes: 70 additions & 6 deletions pkg/eosclient/eosclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type Options struct {
// UseKeyTabAuth changes will authenticate requests by using an EOS keytab.
UseKeytab bool

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool

// SingleUsername is the username to use when connecting to EOS.
// Defaults to apache
SingleUsername string
Expand Down Expand Up @@ -401,7 +405,20 @@ func (c *Client) GetFileInfoByInode(ctx context.Context, uid, gid string, inode
if err != nil {
return nil, err
}
return c.parseFileInfo(stdout)
info, err := c.parseFileInfo(stdout)
if err != nil {
return nil, err
}

if c.opt.VersionInvariant && isVersionFolder(info.File) {
info, err = c.getFileInfoFromVersion(ctx, uid, gid, info.File)
if err != nil {
return nil, err
}
info.Inode = inode
}

return info, nil
}

// GetFileInfoByFXID returns the FileInfo by the given file id in hexadecimal
Expand Down Expand Up @@ -453,7 +470,20 @@ func (c *Client) GetFileInfoByPath(ctx context.Context, uid, gid, path string) (
if err != nil {
return nil, err
}
return c.parseFileInfo(stdout)
info, err := c.parseFileInfo(stdout)
if err != nil {
return nil, err
}

if c.opt.VersionInvariant && !isVersionFolder(path) && !info.IsDir {
inode, err := c.getVersionFolderInode(ctx, uid, gid, path)
if err != nil {
return nil, err
}
info.Inode = inode
}

return info, nil
}

// GetQuota gets the quota of a user on the quota node defined by path
Expand Down Expand Up @@ -586,8 +616,7 @@ func (c *Client) PurgeDeletedEntries(ctx context.Context, uid, gid string) error

// ListVersions list all the versions for a given file.
func (c *Client) ListVersions(ctx context.Context, uid, gid, p string) ([]*FileInfo, error) {
basename := path.Base(p)
versionFolder := path.Join(path.Dir(p), versionPrefix+basename)
versionFolder := getVersionFolder(p)
finfos, err := c.List(ctx, uid, gid, versionFolder)
if err != nil {
// we send back an empty list
Expand All @@ -605,11 +634,46 @@ func (c *Client) RollbackToVersion(ctx context.Context, uid, gid, path, version

// ReadVersion reads the version for the given file.
func (c *Client) ReadVersion(ctx context.Context, uid, gid, p, version string) (io.ReadCloser, error) {
basename := path.Base(p)
versionFile := path.Join(path.Dir(p), versionPrefix+basename, version)
versionFile := path.Join(getVersionFolder(p), version)
return c.Read(ctx, uid, gid, versionFile)
}

func (c *Client) getVersionFolderInode(ctx context.Context, uid, gid, p string) (uint64, error) {
versionFolder := getVersionFolder(p)
md, err := c.GetFileInfoByPath(ctx, uid, gid, versionFolder)
if err != nil {
if err = c.CreateDir(ctx, uid, gid, versionFolder); err != nil {
return 0, err
}
md, err = c.GetFileInfoByPath(ctx, uid, gid, versionFolder)
if err != nil {
return 0, err
}
}
return md.Inode, nil
}

func (c *Client) getFileInfoFromVersion(ctx context.Context, uid, gid, p string) (*FileInfo, error) {
file := getFileFromVersionFolder(p)
md, err := c.GetFileInfoByPath(ctx, uid, gid, file)
if err != nil {
return nil, err
}
return md, nil
}

func isVersionFolder(p string) bool {
return strings.HasPrefix(path.Base(p), versionPrefix)
}

func getVersionFolder(p string) string {
return path.Join(path.Dir(p), versionPrefix+path.Base(p))
}

func getFileFromVersionFolder(p string) string {
return path.Join(path.Dir(p), strings.TrimPrefix(path.Base(p), versionPrefix))
}

func parseRecycleList(raw string) ([]*DeletedEntry, error) {
entries := []*DeletedEntry{}
rawLines := strings.Split(raw, "\n")
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/fs/eos/eos.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ type config struct {
// UseKeyTabAuth changes will authenticate requests by using an EOS keytab.
UseKeytab bool `mapstructure:"use_keytab" docs:"false"`

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool `mapstructure:"version_invariant" docs:"true"`

// GatewaySvc stores the endpoint at which the GRPC gateway is exposed.
GatewaySvc string `mapstructure:"gatewaysvc" docs:"0.0.0.0:19000"`
}
Expand All @@ -100,6 +104,12 @@ func parseConfig(m map[string]interface{}) (*config, error) {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}

// default to version invariance if not configured
if _, ok := m["version_invariant"]; !ok {
c.VersionInvariant = true
}

return c, nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/fs/eoshome/eoshome.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type config struct {
// UseKeyTabAuth changes will authenticate requests by using an EOS keytab.
UseKeytab bool `mapstructure:"use_keytab" docs:"false"`

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool `mapstructure:"version_invariant" docs:"true"`

// GatewaySvc stores the endpoint at which the GRPC gateway is exposed.
GatewaySvc string `mapstructure:"gatewaysvc" docs:"0.0.0.0:19000"`
}
Expand All @@ -106,6 +110,12 @@ func parseConfig(m map[string]interface{}) (*config, error) {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}

// default to version invariance if not configured
if _, ok := m["version_invariant"]; !ok {
c.VersionInvariant = true
}

return c, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ type Config struct {
// EnableHome enables the creation of home directories.
EnableHome bool `mapstructure:"enable_home"`

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool `mapstructure:"version_invariant"`

// GatewaySvc stores the endpoint at which the GRPC gateway is exposed.
GatewaySvc string `mapstructure:"gatewaysvc"`
}
Expand Down Expand Up @@ -198,6 +202,7 @@ func NewEOSFS(c *Config) (storage.FS, error) {
UseKeytab: c.UseKeytab,
Keytab: c.Keytab,
SecProtocol: c.SecProtocol,
VersionInvariant: c.VersionInvariant,
}

eosClient := eosclient.New(eosClientOpts)
Expand Down
23 changes: 10 additions & 13 deletions pkg/storage/utils/eosfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,18 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {

// only delete the upload if it was successfully written to eos
if err == nil {
// cleanup in the background, delete might take a while and we don't need to wait for it to finish
go func() {
if err := os.Remove(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload info")
}
if err := os.Remove(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload info")
}
if err := os.Remove(upload.binPath); err != nil {
if !os.IsNotExist(err) {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload binary")
}
}
if err := os.Remove(upload.binPath); err != nil {
if !os.IsNotExist(err) {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload binary")
}
}()
}
}

// TODO: set mtime if specified in metadata
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/utils/localfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (fs *localfs) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tu
binPath: binPath,
infoPath: binPath + ".info",
fs: fs,
ctx: ctx,
}

if !info.SizeIsDeferred && info.Size == 0 {
Expand Down Expand Up @@ -316,6 +317,9 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
}

err := os.Rename(upload.binPath, np)
if err != nil {
return err
}

// only delete the upload if it was successfully written to the fs
if err := os.Remove(upload.infoPath); err != nil {
Expand Down