Skip to content

Commit

Permalink
make postprocessing configurable
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj committed Jun 24, 2022
1 parent 62864c2 commit 0c56367
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 38 deletions.
9 changes: 9 additions & 0 deletions pkg/storage/utils/decomposedfs/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ type Options struct {

PersonalSpaceAliasTemplate string `mapstructure:"personalspacealias_template"`
GeneralSpaceAliasTemplate string `mapstructure:"generalspacealias_template"`

Postprocessing PostprocessingOptions `mapstructure:"postprocessing"`
}

// PostprocessingOptions defines the available options for postprocessing
type PostprocessingOptions struct {
ASyncFileUploads bool `mapstructure:"asyncfileuploads"`

LockProcessing bool `mapstructure:"lockprocessing"` // for testing processing - maybe not needed?
}

// New returns a new Options instance for the given configuration
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ func (fs *Decomposedfs) UseIn(composer *tusd.StoreComposer) {

// NewUpload returns a new tus Upload instance
func (fs *Decomposedfs) NewUpload(ctx context.Context, info tusd.FileInfo) (tusd.Upload, error) {
return upload.New(ctx, info, fs.lu, fs.tp, fs.p, fs.o.Root)
return upload.New(ctx, info, fs.lu, fs.tp, fs.p, fs.o.Root, fs.o.Postprocessing)
}

// GetUpload returns the Upload for the given upload id
func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) {
return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root)
return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.o.Postprocessing)
}

// AsTerminatableUpload returns a TerminatableUpload
Expand Down
30 changes: 19 additions & 11 deletions pkg/storage/utils/decomposedfs/upload/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@

package upload

import "github.com/cs3org/reva/v2/pkg/utils/postprocessing"
import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/utils/postprocessing"
)

func configurePostprocessing(upload *Upload, o options.PostprocessingOptions) postprocessing.Postprocessing {
waitfor := []string{"initialize"}
if !o.ASyncFileUploads {
waitfor = append(waitfor, "assembling")
}

func configurePostprocessing(upload *Upload) postprocessing.Postprocessing {
// TODO: make configurable
return postprocessing.Postprocessing{
Steps: []postprocessing.Step{
postprocessing.NewStep("initialize", func() error {
Expand All @@ -37,15 +44,16 @@ func configurePostprocessing(upload *Upload) postprocessing.Postprocessing {
}, nil),
postprocessing.NewStep("assembling", upload.finishUpload, upload.cleanup, "initialize"),
},
WaitFor: []string{"assembling"}, // needed for testsuite atm, see comment in upload.cleanup
Finish: func(_ map[string]error) {
// TODO: Handle postprocessing errors

if upload.node != nil {
// temp if to lock marie in eternal processing - dont merge with this
if upload.node.SpaceID == "f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c" && upload.node.SpaceID != upload.node.ID {
return
WaitFor: waitfor,
Finish: func(m map[string]error) {
for alias, err := range m {
if err != nil {
upload.log.Info().Str("step", alias).Err(err).Msg("postprocessing failed")
}

}

if upload.node != nil && !o.LockProcessing {
// unset processing status
_ = upload.node.RemoveMetadata("user.ocis.nodestatus")
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/storage/utils/decomposedfs/upload/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
Expand All @@ -50,7 +51,7 @@ type PermissionsChecker interface {
}

// New returns a new processing instance
func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p PermissionsChecker, fsRoot string) (upload *Upload, err error) {
func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p PermissionsChecker, fsRoot string, o options.PostprocessingOptions) (upload *Upload, err error) {

log := appctx.GetLogger(ctx)
log.Debug().Interface("info", info).Msg("Decomposedfs: NewUpload")
Expand Down Expand Up @@ -155,6 +156,11 @@ func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p
lu: lu,
tp: tp,
Ctx: ctx,
log: appctx.GetLogger(ctx).
With().
Interface("info", info).
Str("binPath", binPath).
Logger(),
}

// writeInfo creates the file by itself if necessary
Expand All @@ -163,12 +169,12 @@ func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p
return nil, err
}

u.pp = configurePostprocessing(u)
u.pp = configurePostprocessing(u, o)
return u, nil
}

// Get returns the Upload for the given upload id
func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot string) (*Upload, error) {
func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot string, o options.PostprocessingOptions) (*Upload, error) {
infoPath := filepath.Join(fsRoot, "uploads", id+".info")

info := tusd.FileInfo{}
Expand Down Expand Up @@ -221,7 +227,7 @@ func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot stri
Ctx: ctx,
}

up.pp = configurePostprocessing(up)
up.pp = configurePostprocessing(up, o)
return up, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ type Upload struct {
oldsize *uint64
// Postprocessing to start postprocessing
pp postprocessing.Postprocessing
// TODO add logger as well?
// and a logger as well
log zerolog.Logger
}

// WriteChunk writes the stream from the reader to the given offset of the upload
Expand Down
40 changes: 20 additions & 20 deletions pkg/utils/postprocessing/postprocessing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"testing"
"time"

"github.com/cs3org/reva/v2/pkg/utils/postprocessing"
pp "github.com/cs3org/reva/v2/pkg/utils/postprocessing"
"github.com/test-go/testify/require"
)

Expand Down Expand Up @@ -72,9 +72,9 @@ func (b *Cbool) Get() bool {

func Test_ItRunsStepsAsync(t *testing.T) {
stepdone := Bool()
pp := postprocessing.Postprocessing{
Steps: []postprocessing.Step{
postprocessing.NewStep("stepA", FailureAfter(_waitTime), func(error) {
pp := pp.Postprocessing{
Steps: []pp.Step{
pp.NewStep("stepA", FailureAfter(_waitTime), func(error) {
stepdone.Set(true)
}),
},
Expand All @@ -87,9 +87,9 @@ func Test_ItRunsStepsAsync(t *testing.T) {

func Test_ItSyncsIfConfigured(t *testing.T) {
stepdone := Bool()
pp := postprocessing.Postprocessing{
Steps: []postprocessing.Step{
postprocessing.NewStep("stepA", FailureAfter(_waitTime), func(error) {
pp := pp.Postprocessing{
Steps: []pp.Step{
pp.NewStep("stepA", FailureAfter(_waitTime), func(error) {
stepdone.Set(true)
}),
},
Expand All @@ -104,16 +104,16 @@ func Test_ItSyncsIfConfigured(t *testing.T) {
func Test_ItRunsStepsInParallel(t *testing.T) {
astarted, afinished := Bool(), Bool()
bstarted, bfinished := Bool(), Bool()
pp := postprocessing.Postprocessing{
Steps: []postprocessing.Step{
postprocessing.NewStep("stepA", func() error {
pp := pp.Postprocessing{
Steps: []pp.Step{
pp.NewStep("stepA", func() error {
astarted.Set(true)
time.Sleep(_waitTime)
return nil
}, func(error) {
afinished.Set(true)
}),
postprocessing.NewStep("stepB", func() error {
pp.NewStep("stepB", func() error {
bstarted.Set(true)
time.Sleep(_waitTime)
return nil
Expand All @@ -134,14 +134,14 @@ func Test_ItRunsStepsInParallel(t *testing.T) {

func Test_ItWaitsForSpecificSteps(t *testing.T) {
stepdone := Bool()
pp := postprocessing.Postprocessing{
Steps: []postprocessing.Step{
postprocessing.NewStep("stepA", func() error {
pp := pp.Postprocessing{
Steps: []pp.Step{
pp.NewStep("stepA", func() error {
time.Sleep(_waitTime)
stepdone.Set(true)
return nil
}, nil),
postprocessing.NewStep("stepB", func() error {
pp.NewStep("stepB", func() error {
if !stepdone.Get() {
return errors.New("step not done")
}
Expand All @@ -159,14 +159,14 @@ func Test_ItCollectsStepResults(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
var results map[string]error
pp := postprocessing.Postprocessing{
Steps: []postprocessing.Step{
postprocessing.NewStep("stepA", func() error {
pp := pp.Postprocessing{
Steps: []pp.Step{
pp.NewStep("stepA", func() error {
time.Sleep(_waitTime)
return errors.New("stepA failed")
}, nil),
postprocessing.NewStep("stepB", SuccessAfter(_waitTime), nil),
postprocessing.NewStep("stepC", func() error {
pp.NewStep("stepB", SuccessAfter(_waitTime), nil),
pp.NewStep("stepC", func() error {
time.Sleep(_waitTime)
return errors.New("stepC failed")
}, nil),
Expand Down

0 comments on commit 0c56367

Please sign in to comment.