Skip to content

Commit

Permalink
Move chunking assembly logic to fs
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 committed Oct 30, 2020
1 parent 4942c89 commit 28969c7
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 86 deletions.
7 changes: 6 additions & 1 deletion internal/http/services/dataprovider/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,28 @@ import (

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
)

func (s *svc) doPut(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := appctx.GetLogger(ctx)
fn := r.URL.Path
defer r.Body.Close()

fsfn := strings.TrimPrefix(fn, s.conf.Prefix)
ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}}

err := s.storage.Upload(ctx, ref, r.Body)
if err != nil {
if _, ok := err.(errtypes.IsPartialContent); ok {
w.WriteHeader(http.StatusPartialContent)
return
}
log.Error().Err(err).Msg("error uploading file")
w.WriteHeader(http.StatusInternalServerError)
return
}

r.Body.Close()
w.WriteHeader(http.StatusOK)
}
31 changes: 4 additions & 27 deletions internal/http/services/owncloud/ocdav/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"net/http"
"path"
"regexp"
"strconv"
"time"

Expand All @@ -35,11 +34,6 @@ import (
"github.com/cs3org/reva/pkg/utils"
)

func isChunked(fn string) (bool, error) {
// FIXME: also need to check whether the OC-Chunked header is set
return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn)
}

func sufferMacOSFinder(r *http.Request) bool {
return r.Header.Get("X-Expected-Entity-Length") != ""
}
Expand Down Expand Up @@ -118,27 +112,6 @@ func (s *svc) handlePut(w http.ResponseWriter, r *http.Request, ns string) {
return
}

ok, err := isChunked(fn)
if err != nil {
log.Error().Err(err).Msg("error checking if request is chunked or not")
w.WriteHeader(http.StatusInternalServerError)
return
}

if ok {
// TODO: disable if chunking capability is turned off in config
/**
if s.c.Capabilities.Dav.Chunking == "1.0" {
s.handlePutChunked(w, r)
} else {
log.Error().Err(err).Msg("chunking 1.0 is not enabled")
w.WriteHeader(http.StatusBadRequest)
}
*/
s.handlePutChunked(w, r, ns)
return
}

if isContentRange(r) {
log.Warn().Msg("Content-Range not supported for PUT")
w.WriteHeader(http.StatusNotImplemented)
Expand Down Expand Up @@ -279,6 +252,10 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io
}
defer httpRes.Body.Close()
if httpRes.StatusCode != http.StatusOK {
if httpRes.StatusCode == http.StatusPartialContent {
w.WriteHeader(http.StatusPartialContent)
return
}
log.Err(err).Msg("PUT request to data server failed")
w.WriteHeader(http.StatusInternalServerError)
return
Expand Down
19 changes: 18 additions & 1 deletion pkg/errtypes/errtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type InternalError string

func (e InternalError) Error() string { return "internal error: " + string(e) }

// IsInternalError is the method to check for w
func (e InternalError) IsInternalError() {}

// PermissionDenied is the error to use when a resource cannot be access because of missing permissions.
type PermissionDenied string

Expand Down Expand Up @@ -75,6 +78,14 @@ func (e NotSupported) Error() string { return "error: not supported: " + string(
// IsNotSupported implements the IsNotSupported interface.
func (e NotSupported) IsNotSupported() {}

// PartialContent is the error to use when the client request has partial data.
type PartialContent string

func (e PartialContent) Error() string { return "error: partial content: " + string(e) }

// IsPartialContent implements the IsPartialContent interface.
func (e PartialContent) IsPartialContent() {}

// IsNotFound is the interface to implement
// to specify that an a resource is not found.
type IsNotFound interface {
Expand Down Expand Up @@ -112,7 +123,13 @@ type IsNotSupported interface {
}

// IsPermissionDenied is the interface to implement
// to specify that an action is not supported.
// to specify that an action is denied.
type IsPermissionDenied interface {
IsPermissionDenied()
}

// IsPartialContent is the interface to implement
// to specify that the client request has partial data.
type IsPartialContent interface {
IsPartialContent()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,39 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package ocdav
package chunking

import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"

"github.com/cs3org/reva/pkg/appctx"
)

// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory
// until it gets the final chunk which is then returned.
type ChunkHandler struct {
ChunkFolder string `mapstructure:"chunk_folder"`
}

// NewChunkHandler creates a handler for chunked uploads.
func NewChunkHandler(chunkFolder string) *ChunkHandler {
return &ChunkHandler{chunkFolder}
}

type chunkBLOBInfo struct {
path string
transferID string
totalChunks int64
currentChunk int64
}

// not using the resource path in the chunk folder name allows uploading
// to the same folder after a move without having to restart the chunk
// upload
// Not using the resource path in the chunk folder name allows uploading to
// the same folder after a move without having to restart the chunk upload
func (c *chunkBLOBInfo) uploadID() string {
return fmt.Sprintf("chunking-%s-%d", c.transferID, c.totalChunks)
}
Expand Down Expand Up @@ -72,35 +78,31 @@ func getChunkBLOBInfo(path string) (*chunkBLOBInfo, error) {
}, nil
}

func (s *svc) createChunkTempFile() (string, *os.File, error) {
file, err := ioutil.TempFile(fmt.Sprintf("/%s", s.c.ChunkFolder), "")
func (c *ChunkHandler) createChunkTempFile() (string, *os.File, error) {
file, err := ioutil.TempFile(fmt.Sprintf("/%s", c.ChunkFolder), "")
if err != nil {
return "", nil, err
}

return file.Name(), file, nil
}

func (s *svc) getChunkFolderName(i *chunkBLOBInfo) (string, error) {
path := "/" + s.c.ChunkFolder + filepath.Clean("/"+i.uploadID())
func (c *ChunkHandler) getChunkFolderName(i *chunkBLOBInfo) (string, error) {
path := "/" + c.ChunkFolder + filepath.Clean("/"+i.uploadID())
if err := os.MkdirAll(path, 0755); err != nil {
return "", err
}
return path, nil
}

func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool, string, error) {
log := appctx.GetLogger(ctx)
func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, error) {
chunkInfo, err := getChunkBLOBInfo(path)
if err != nil {
err := fmt.Errorf("error getting chunk info from path: %s", path)
return false, "", err
}

//c.logger.Info().Log("chunknum", chunkInfo.currentChunk, "chunks", chunkInfo.totalChunks,
//"transferid", chunkInfo.transferID, "uploadid", chunkInfo.uploadID())

chunkTempFilename, chunkTempFile, err := s.createChunkTempFile()
chunkTempFilename, chunkTempFile, err := c.createChunkTempFile()
if err != nil {
return false, "", err
}
Expand All @@ -116,7 +118,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool
return false, "", err
}

chunksFolderName, err := s.getChunkFolderName(chunkInfo)
chunksFolderName, err := c.getChunkFolderName(chunkInfo)
if err != nil {
return false, "", err
}
Expand Down Expand Up @@ -144,7 +146,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool
return false, "", err
}

// there is still some chunks to be uploaded.
// there are still some chunks to be uploaded.
// we return CodeUploadIsPartial to notify upper layers that the upload is still
// not complete and requires more actions.
// This code is needed to notify the owncloud webservice that the upload has not yet been
Expand All @@ -153,7 +155,7 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool
return false, "", nil
}

assembledFileName, assembledFile, err := s.createChunkTempFile()
assembledFileName, assembledFile, err := c.createChunkTempFile()
if err != nil {
return false, "", err
}
Expand Down Expand Up @@ -183,57 +185,38 @@ func (s *svc) saveChunk(ctx context.Context, path string, r io.ReadCloser) (bool

// at this point the assembled file is complete
// so we free space removing the chunks folder
defer func() {
if err = os.RemoveAll(chunksFolderName); err != nil {
log.Warn().Err(err).Msg("error deleting chunk folder, remove folder manually/cron to not leak storage space")
}
}()
defer os.RemoveAll(chunksFolderName)

return true, assembledFileName, nil
}
func (s *svc) handlePutChunked(w http.ResponseWriter, r *http.Request, ns string) {
ctx := r.Context()
log := appctx.GetLogger(ctx)

fn := r.URL.Path

if r.Body == nil {
log.Warn().Msg("body is nil")
w.WriteHeader(http.StatusBadRequest)
return
}
func (c *ChunkHandler) IsChunked(fn string) (bool, error) {
// FIXME: also need to check whether the OC-Chunked header is set
return regexp.MatchString(`-chunking-\w+-[0-9]+-[0-9]+$`, fn)
}

finish, chunk, err := s.saveChunk(ctx, fn, r.Body)
func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) {
finish, chunk, err := c.saveChunk(fn, r)
if err != nil {
log.Error().Err(err).Msg("error saving chunk")
w.WriteHeader(http.StatusInternalServerError)
return
return "", nil, err
}

if !finish {
w.WriteHeader(http.StatusPartialContent)
return
return "", nil, nil
}

fd, err := os.Open(chunk)
if err != nil {
log.Error().Err(err).Msg("error opening chunk")
w.WriteHeader(http.StatusInternalServerError)
return
return "", nil, err
}
defer fd.Close()

md, err := fd.Stat()
chunkInfo, err := getChunkBLOBInfo(fn)
if err != nil {
log.Error().Err(err).Msg("error statting chunk")
w.WriteHeader(http.StatusInternalServerError)
return
return "", nil, err
}

chunkInfo, _ := getChunkBLOBInfo(fn)
fn = path.Join(applyLayout(ctx, ns), chunkInfo.path)

s.handlePutHelper(w, r, fd, fn, md.Size())
return chunkInfo.path, fd, nil

// TODO(labkode): implement old chunking

Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cs3org/reva/pkg/sharedconf"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/utils/acl"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/cs3org/reva/pkg/storage/utils/grants"
"github.com/cs3org/reva/pkg/storage/utils/templates"
"github.com/cs3org/reva/pkg/user"
Expand Down Expand Up @@ -175,6 +176,7 @@ func (c *Config) init() {
type eosfs struct {
c *eosclient.Client
conf *Config
chunkHandler *chunking.ChunkHandler
singleUserUID string
singleUserGID string
}
Expand Down Expand Up @@ -208,8 +210,9 @@ func NewEOSFS(c *Config) (storage.FS, error) {
eosClient := eosclient.New(eosClientOpts)

eosfs := &eosfs{
c: eosClient,
conf: c,
c: eosClient,
conf: c,
chunkHandler: chunking.NewChunkHandler(c.CacheDirectory),
}

return eosfs, nil
Expand Down
15 changes: 14 additions & 1 deletion pkg/storage/utils/eosfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,21 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC
return errtypes.PermissionDenied("eos: cannot upload under the virtual share folder")
}

fn := fs.wrap(ctx, p)
ok, err := fs.chunkHandler.IsChunked(p)
if err != nil {
return errors.Wrap(err, "eos: error resolving reference")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
if err != nil {
return err
}
if p == "" {
return errtypes.PartialContent(ref.String())
}
}

fn := fs.wrap(ctx, p)
return fs.c.Write(ctx, uid, gid, fn, r)
}

Expand Down

0 comments on commit 28969c7

Please sign in to comment.