Skip to content

Commit

Permalink
Merge pull request #4449 from butonic/keep-failed-processing-status
Browse files Browse the repository at this point in the history
keep failed processing status
  • Loading branch information
butonic authored Jan 9, 2024
2 parents 46b7156 + e767c44 commit 00f8d44
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 21 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/keep-failed-processing-status.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Keep failed processing status

We now keep tho postprocessing status when a blob could not be copied to the blobstore.

https://github.com/cs3org/reva/pull/4449
11 changes: 7 additions & 4 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type SessionStore interface {
New(ctx context.Context) *upload.OcisSession
List(ctx context.Context) ([]*upload.OcisSession, error)
Get(ctx context.Context, id string) (*upload.OcisSession, error)
Cleanup(ctx context.Context, session upload.Session, failure bool, keepUpload bool)
Cleanup(ctx context.Context, session upload.Session, revertNodeMetadata, keepUpload, unmarkPostprocessing bool)
}

// Decomposedfs provides the base for decomposed filesystem implementations
Expand Down Expand Up @@ -281,14 +281,15 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
}
if !n.Exists {
log.Debug().Str("uploadID", ev.UploadID).Str("nodeID", session.NodeID()).Msg("node no longer exists")
fs.sessionStore.Cleanup(ctx, session, false, false)
fs.sessionStore.Cleanup(ctx, session, false, false, false)
continue
}

var (
failed bool
keepUpload bool
)
unmarkPostprocessing := true

switch ev.Outcome {
default:
Expand All @@ -301,8 +302,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
case events.PPOutcomeContinue:
if err := session.Finalize(); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload")
keepUpload = true // should we keep the upload when assembling failed?
failed = true
keepUpload = true
// keep postprocessing status so the upload is not deleted during housekeeping
unmarkPostprocessing = false
} else {
metrics.UploadSessionsFinalized.Inc()
}
Expand Down Expand Up @@ -334,7 +337,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
}
}

fs.sessionStore.Cleanup(ctx, session, failed, keepUpload)
fs.sessionStore.Cleanup(ctx, session, failed, keepUpload, unmarkPostprocessing)

// remove cache entry in gateway
fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
Expand Down
26 changes: 14 additions & 12 deletions pkg/storage/utils/decomposedfs/upload/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,27 @@ type Session interface {
ID() string
Node(ctx context.Context) (*node.Node, error)
Context(ctx context.Context) context.Context
Cleanup(cleanNode, cleanBin, cleanInfo bool)
Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool)
}

// Cleanup cleans upload metadata, binary data and processing status as necessary
func (store OcisStore) Cleanup(ctx context.Context, session Session, failure bool, keepUpload bool) {
func (store OcisStore) Cleanup(ctx context.Context, session Session, revertNodeMetadata, keepUpload, unmarkPostprocessing bool) {
ctx, span := tracer.Start(session.Context(ctx), "Cleanup")
defer span.End()
session.Cleanup(failure, !keepUpload, !keepUpload)
session.Cleanup(revertNodeMetadata, !keepUpload, !keepUpload)

// unset processing status
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Info().Str("session", session.ID()).Err(err).Msg("could not read node")
return
}
// FIXME: after cleanup the node might already be deleted ...
if n != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch)
if err := n.UnmarkProcessing(ctx, session.ID()); err != nil {
appctx.GetLogger(ctx).Info().Str("path", n.InternalPath()).Err(err).Msg("unmarking processing failed")
if unmarkPostprocessing {
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Info().Str("session", session.ID()).Err(err).Msg("could not read node")
return
}
// FIXME: after cleanup the node might already be deleted ...
if n != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch)
if err := n.UnmarkProcessing(ctx, session.ID()); err != nil {
appctx.GetLogger(ctx).Info().Str("path", n.InternalPath()).Err(err).Msg("unmarking processing failed")
}
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0])
}
if err != nil {
session.store.Cleanup(ctx, session, true, false)
session.store.Cleanup(ctx, session, true, false, false)
return err
}
}
Expand All @@ -191,7 +191,7 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {

n, err := session.store.CreateNodeForUpload(session, attrs)
if err != nil {
session.store.Cleanup(ctx, session, true, false)
session.store.Cleanup(ctx, session, true, false, true)
return err
}

Expand Down Expand Up @@ -223,7 +223,7 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
if !session.store.async {
// handle postprocessing synchronously
err = session.Finalize()
session.store.Cleanup(ctx, session, err != nil, false)
session.store.Cleanup(ctx, session, err != nil, false, err == nil)
if err != nil {
log.Error().Err(err).Msg("failed to upload")
return err
Expand Down Expand Up @@ -312,10 +312,10 @@ func (session *OcisSession) removeNode(ctx context.Context) {
}

// cleanup cleans up after the upload is finished
func (session *OcisSession) Cleanup(cleanNode, cleanBin, cleanInfo bool) {
func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool) {
ctx := session.Context(context.Background())

if cleanNode {
if revertNodeMetadata {
if session.NodeExists() {
p := session.info.MetaData["versionsPath"]
n, err := session.Node(ctx)
Expand Down

0 comments on commit 00f8d44

Please sign in to comment.