diff --git a/pkg/storage/utils/decomposedfs/options/options.go b/pkg/storage/utils/decomposedfs/options/options.go index 16811837a74..b69a4210bd3 100644 --- a/pkg/storage/utils/decomposedfs/options/options.go +++ b/pkg/storage/utils/decomposedfs/options/options.go @@ -51,6 +51,15 @@ type Options struct { PersonalSpaceAliasTemplate string `mapstructure:"personalspacealias_template"` GeneralSpaceAliasTemplate string `mapstructure:"generalspacealias_template"` + + Postprocessing PostprocessingOptions `mapstructure:"postprocessing"` +} + +// PostprocessingOptions defines the available options for postprocessing +type PostprocessingOptions struct { + ASyncFileUploads bool `mapstructure:"asyncfileuploads"` + + LockProcessing bool `mapstructure:"lockprocessing"` // for testing processing - maybe not needed? } // New returns a new Options instance for the given configuration diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 58685e06dec..5e0e29917b3 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -191,12 +191,12 @@ func (fs *Decomposedfs) UseIn(composer *tusd.StoreComposer) { // NewUpload returns a new tus Upload instance func (fs *Decomposedfs) NewUpload(ctx context.Context, info tusd.FileInfo) (tusd.Upload, error) { - return upload.New(ctx, info, fs.lu, fs.tp, fs.p, fs.o.Root) + return upload.New(ctx, info, fs.lu, fs.tp, fs.p, fs.o.Root, fs.o.Postprocessing) } // GetUpload returns the Upload for the given upload id func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { - return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root) + return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.o.Postprocessing) } // AsTerminatableUpload returns a TerminatableUpload diff --git a/pkg/storage/utils/decomposedfs/upload/postprocessing.go b/pkg/storage/utils/decomposedfs/upload/postprocessing.go index 88efd8a3034..fe9065a0c5d 100644 --- a/pkg/storage/utils/decomposedfs/upload/postprocessing.go +++ b/pkg/storage/utils/decomposedfs/upload/postprocessing.go @@ -18,10 +18,17 @@ package upload -import "github.com/cs3org/reva/v2/pkg/utils/postprocessing" +import ( + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/utils/postprocessing" +) + +func configurePostprocessing(upload *Upload, o options.PostprocessingOptions) postprocessing.Postprocessing { + waitfor := []string{"initialize"} + if !o.ASyncFileUploads { + waitfor = append(waitfor, "assembling") + } -func configurePostprocessing(upload *Upload) postprocessing.Postprocessing { - // TODO: make configurable return postprocessing.Postprocessing{ Steps: []postprocessing.Step{ postprocessing.NewStep("initialize", func() error { @@ -37,15 +44,16 @@ func configurePostprocessing(upload *Upload) postprocessing.Postprocessing { }, nil), postprocessing.NewStep("assembling", upload.finishUpload, upload.cleanup, "initialize"), }, - WaitFor: []string{"assembling"}, // needed for testsuite atm, see comment in upload.cleanup - Finish: func(_ map[string]error) { - // TODO: Handle postprocessing errors - - if upload.node != nil { - // temp if to lock marie in eternal processing - dont merge with this - if upload.node.SpaceID == "f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c" && upload.node.SpaceID != upload.node.ID { - return + WaitFor: waitfor, + Finish: func(m map[string]error) { + for alias, err := range m { + if err != nil { + upload.log.Info().Str("step", alias).Err(err).Msg("postprocessing failed") } + + } + + if upload.node != nil && !o.LockProcessing { // unset processing status _ = upload.node.RemoveMetadata("user.ocis.nodestatus") } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 1b5d4b51f98..158f95c23fb 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -35,6 +35,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" @@ -50,7 +51,7 @@ type PermissionsChecker interface { } // New returns a new processing instance -func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p PermissionsChecker, fsRoot string) (upload *Upload, err error) { +func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p PermissionsChecker, fsRoot string, o options.PostprocessingOptions) (upload *Upload, err error) { log := appctx.GetLogger(ctx) log.Debug().Interface("info", info).Msg("Decomposedfs: NewUpload") @@ -155,6 +156,11 @@ func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p lu: lu, tp: tp, Ctx: ctx, + log: appctx.GetLogger(ctx). + With(). + Interface("info", info). + Str("binPath", binPath). + Logger(), } // writeInfo creates the file by itself if necessary @@ -163,12 +169,12 @@ func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p return nil, err } - u.pp = configurePostprocessing(u) + u.pp = configurePostprocessing(u, o) return u, nil } // Get returns the Upload for the given upload id -func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot string) (*Upload, error) { +func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot string, o options.PostprocessingOptions) (*Upload, error) { infoPath := filepath.Join(fsRoot, "uploads", id+".info") info := tusd.FileInfo{} @@ -221,7 +227,7 @@ func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot stri Ctx: ctx, } - up.pp = configurePostprocessing(up) + up.pp = configurePostprocessing(up, o) return up, nil } diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index ecfc7f5b077..3eb4590718d 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -92,7 +92,8 @@ type Upload struct { oldsize *uint64 // Postprocessing to start postprocessing pp postprocessing.Postprocessing - // TODO add logger as well? + // and a logger as well + log zerolog.Logger } // WriteChunk writes the stream from the reader to the given offset of the upload diff --git a/pkg/utils/postprocessing/postprocessing_test.go b/pkg/utils/postprocessing/postprocessing_test.go index 02386a99691..0af219b8007 100644 --- a/pkg/utils/postprocessing/postprocessing_test.go +++ b/pkg/utils/postprocessing/postprocessing_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "github.com/cs3org/reva/v2/pkg/utils/postprocessing" + pp "github.com/cs3org/reva/v2/pkg/utils/postprocessing" "github.com/test-go/testify/require" ) @@ -72,9 +72,9 @@ func (b *Cbool) Get() bool { func Test_ItRunsStepsAsync(t *testing.T) { stepdone := Bool() - pp := postprocessing.Postprocessing{ - Steps: []postprocessing.Step{ - postprocessing.NewStep("stepA", FailureAfter(_waitTime), func(error) { + pp := pp.Postprocessing{ + Steps: []pp.Step{ + pp.NewStep("stepA", FailureAfter(_waitTime), func(error) { stepdone.Set(true) }), }, @@ -87,9 +87,9 @@ func Test_ItRunsStepsAsync(t *testing.T) { func Test_ItSyncsIfConfigured(t *testing.T) { stepdone := Bool() - pp := postprocessing.Postprocessing{ - Steps: []postprocessing.Step{ - postprocessing.NewStep("stepA", FailureAfter(_waitTime), func(error) { + pp := pp.Postprocessing{ + Steps: []pp.Step{ + pp.NewStep("stepA", FailureAfter(_waitTime), func(error) { stepdone.Set(true) }), }, @@ -104,16 +104,16 @@ func Test_ItSyncsIfConfigured(t *testing.T) { func Test_ItRunsStepsInParallel(t *testing.T) { astarted, afinished := Bool(), Bool() bstarted, bfinished := Bool(), Bool() - pp := postprocessing.Postprocessing{ - Steps: []postprocessing.Step{ - postprocessing.NewStep("stepA", func() error { + pp := pp.Postprocessing{ + Steps: []pp.Step{ + pp.NewStep("stepA", func() error { astarted.Set(true) time.Sleep(_waitTime) return nil }, func(error) { afinished.Set(true) }), - postprocessing.NewStep("stepB", func() error { + pp.NewStep("stepB", func() error { bstarted.Set(true) time.Sleep(_waitTime) return nil @@ -134,14 +134,14 @@ func Test_ItRunsStepsInParallel(t *testing.T) { func Test_ItWaitsForSpecificSteps(t *testing.T) { stepdone := Bool() - pp := postprocessing.Postprocessing{ - Steps: []postprocessing.Step{ - postprocessing.NewStep("stepA", func() error { + pp := pp.Postprocessing{ + Steps: []pp.Step{ + pp.NewStep("stepA", func() error { time.Sleep(_waitTime) stepdone.Set(true) return nil }, nil), - postprocessing.NewStep("stepB", func() error { + pp.NewStep("stepB", func() error { if !stepdone.Get() { return errors.New("step not done") } @@ -159,14 +159,14 @@ func Test_ItCollectsStepResults(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) var results map[string]error - pp := postprocessing.Postprocessing{ - Steps: []postprocessing.Step{ - postprocessing.NewStep("stepA", func() error { + pp := pp.Postprocessing{ + Steps: []pp.Step{ + pp.NewStep("stepA", func() error { time.Sleep(_waitTime) return errors.New("stepA failed") }, nil), - postprocessing.NewStep("stepB", SuccessAfter(_waitTime), nil), - postprocessing.NewStep("stepC", func() error { + pp.NewStep("stepB", SuccessAfter(_waitTime), nil), + pp.NewStep("stepC", func() error { time.Sleep(_waitTime) return errors.New("stepC failed") }, nil),