From 9cbee1a90617aed3416ce81d8827e7b402d90d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 17 May 2022 08:49:15 +0200 Subject: [PATCH 01/11] Store thr providerin in the tus upload info for later usage --- internal/grpc/services/storageprovider/storageprovider.go | 4 ++++ pkg/storage/fs/cephfs/upload.go | 1 + pkg/storage/fs/owncloudsql/upload.go | 1 + pkg/storage/utils/decomposedfs/upload.go | 2 +- pkg/storage/utils/localfs/upload.go | 1 + 5 files changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index c21b56e21d..f64d4b8789 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -371,6 +371,10 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value) } } + + // pass on the provider it to be persisted with the upload info. that is required to correlate the upload with the proper provider later on + metadata["providerID"] = s.conf.MountID + uploadIDs, err := s.storage.InitiateUpload(ctx, req.Ref, uploadLength, metadata) if err != nil { var st *rpc.Status diff --git a/pkg/storage/fs/cephfs/upload.go b/pkg/storage/fs/cephfs/upload.go index 87ff95f2b2..822249ff1e 100644 --- a/pkg/storage/fs/cephfs/upload.go +++ b/pkg/storage/fs/cephfs/upload.go @@ -111,6 +111,7 @@ func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, u } if metadata != nil { + info.MetaData["providerID"] = metadata["providerID"] if metadata["mtime"] != "" { info.MetaData["mtime"] = metadata["mtime"] } diff --git a/pkg/storage/fs/owncloudsql/upload.go b/pkg/storage/fs/owncloudsql/upload.go index ace1688246..ac06e853d2 100644 --- a/pkg/storage/fs/owncloudsql/upload.go +++ b/pkg/storage/fs/owncloudsql/upload.go @@ -107,6 +107,7 @@ func (fs *owncloudsqlfs) InitiateUpload(ctx context.Context, ref *provider.Refer } if metadata != nil { + info.MetaData["providerID"] = metadata["providerID"] if metadata["mtime"] != "" { info.MetaData["mtime"] = metadata["mtime"] } diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index c0cdc086f8..a37eb8de02 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -99,7 +99,6 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i // TODO read optional content for small files in this request // TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { - log := appctx.GetLogger(ctx) n, err := fs.lu.NodeFromResource(ctx, ref) @@ -129,6 +128,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere } if metadata != nil { + info.MetaData["providerID"] = metadata["providerID"] if mtime, ok := metadata["mtime"]; ok { info.MetaData["mtime"] = mtime } diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index 2018780734..978faef368 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -99,6 +99,7 @@ func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, } if metadata != nil { + info.MetaData["providerID"] = metadata["providerID"] if metadata["mtime"] != "" { info.MetaData["mtime"] = metadata["mtime"] } From d5ec4159276e3c1dd10786237626aaba57b5c89f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 17 May 2022 08:50:25 +0200 Subject: [PATCH 02/11] Only fire FileUploaded events when the files has been actually uploaded We previously emitted them after the InitiateFileUpload call which could lead to the indexer trying to stat the file even though the upload hadn't been finished yet. --- .../interceptors/eventsmiddleware/events.go | 4 -- .../services/dataprovider/dataprovider.go | 32 ++++++++---- pkg/rhttp/datatx/manager/registry/registry.go | 7 ++- pkg/rhttp/datatx/manager/simple/simple.go | 11 ++-- pkg/rhttp/datatx/manager/spaces/spaces.go | 11 ++-- pkg/rhttp/datatx/manager/tus/tus.go | 51 +++++++++++++++++-- 6 files changed, 91 insertions(+), 25 deletions(-) diff --git a/internal/grpc/interceptors/eventsmiddleware/events.go b/internal/grpc/interceptors/eventsmiddleware/events.go index 72cf53179d..a57a0a1316 100644 --- a/internal/grpc/interceptors/eventsmiddleware/events.go +++ b/internal/grpc/interceptors/eventsmiddleware/events.go @@ -108,10 +108,6 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error if isSuccess(v) { ev = ContainerCreated(v, req.(*provider.CreateContainerRequest), executantID) } - case *provider.InitiateFileUploadResponse: - if isSuccess(v) { - ev = FileUploaded(v, req.(*provider.InitiateFileUploadRequest), executantID) - } case *provider.InitiateFileDownloadResponse: if isSuccess(v) { ev = FileDownloaded(v, req.(*provider.InitiateFileDownloadRequest)) diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 2591ad80b7..a9dca3258c 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -23,11 +23,14 @@ import ( "net/http" "github.com/cs3org/reva/v2/pkg/appctx" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/server" datatxregistry "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry" "github.com/cs3org/reva/v2/pkg/rhttp/global" "github.com/cs3org/reva/v2/pkg/rhttp/router" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" + "github.com/go-micro/plugins/v4/events/natsjs" "github.com/mitchellh/mapstructure" "github.com/rs/zerolog" ) @@ -37,12 +40,14 @@ func init() { } type config struct { - Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` - Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` - Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` - DataTXs map[string]map[string]interface{} `mapstructure:"data_txs" docs:"url:pkg/rhttp/datatx/manager/simple/simple.go;The configuration for the data tx protocols"` - Timeout int64 `mapstructure:"timeout"` - Insecure bool `mapstructure:"insecure"` + Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` + Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."` + Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"` + DataTXs map[string]map[string]interface{} `mapstructure:"data_txs" docs:"url:pkg/rhttp/datatx/manager/simple/simple.go;The configuration for the data tx protocols"` + Timeout int64 `mapstructure:"timeout"` + Insecure bool `mapstructure:"insecure"` + NatsAddress string `mapstructure:"nats_address"` + NatsClusterID string `mapstructure:"nats_clusterID"` } func (c *config) init() { @@ -75,7 +80,16 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) return nil, err } - dataTXs, err := getDataTXs(conf, fs) + var publisher events.Publisher + + if conf.NatsAddress != "" && conf.NatsClusterID != "" { + publisher, err = server.NewNatsStream(natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID)) + if err != nil { + return nil, err + } + } + + dataTXs, err := getDataTXs(conf, fs, publisher) if err != nil { return nil, err } @@ -97,7 +111,7 @@ func getFS(c *config) (storage.FS, error) { return nil, fmt.Errorf("driver not found: %s", c.Driver) } -func getDataTXs(c *config, fs storage.FS) (map[string]http.Handler, error) { +func getDataTXs(c *config, fs storage.FS, publisher events.Publisher) (map[string]http.Handler, error) { if c.DataTXs == nil { c.DataTXs = make(map[string]map[string]interface{}) } @@ -110,7 +124,7 @@ func getDataTXs(c *config, fs storage.FS) (map[string]http.Handler, error) { txs := make(map[string]http.Handler) for t := range c.DataTXs { if f, ok := datatxregistry.NewFuncs[t]; ok { - if tx, err := f(c.DataTXs[t]); err == nil { + if tx, err := f(c.DataTXs[t], publisher); err == nil { if handler, err := tx.Handler(fs); err == nil { txs[t] = handler } diff --git a/pkg/rhttp/datatx/manager/registry/registry.go b/pkg/rhttp/datatx/manager/registry/registry.go index b30a657f0d..349126be92 100644 --- a/pkg/rhttp/datatx/manager/registry/registry.go +++ b/pkg/rhttp/datatx/manager/registry/registry.go @@ -18,11 +18,14 @@ package registry -import "github.com/cs3org/reva/v2/pkg/rhttp/datatx" +import ( + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx" +) // NewFunc is the function that data transfer implementations // should register at init time. -type NewFunc func(map[string]interface{}) (datatx.DataTX, error) +type NewFunc func(map[string]interface{}, events.Publisher) (datatx.DataTX, error) // NewFuncs is a map containing all the registered data transfers. var NewFuncs = map[string]NewFunc{} diff --git a/pkg/rhttp/datatx/manager/simple/simple.go b/pkg/rhttp/datatx/manager/simple/simple.go index a7acc5dba2..c603391a87 100644 --- a/pkg/rhttp/datatx/manager/simple/simple.go +++ b/pkg/rhttp/datatx/manager/simple/simple.go @@ -24,6 +24,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/rhttp/datatx" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" @@ -39,7 +40,8 @@ func init() { type config struct{} type manager struct { - conf *config + conf *config + publisher events.Publisher } func parseConfig(m map[string]interface{}) (*config, error) { @@ -52,13 +54,16 @@ func parseConfig(m map[string]interface{}) (*config, error) { } // New returns a datatx manager implementation that relies on HTTP PUT/GET. -func New(m map[string]interface{}) (datatx.DataTX, error) { +func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, error) { c, err := parseConfig(m) if err != nil { return nil, err } - return &manager{conf: c}, nil + return &manager{ + conf: c, + publisher: publisher, + }, nil } func (m *manager) Handler(fs storage.FS) (http.Handler, error) { diff --git a/pkg/rhttp/datatx/manager/spaces/spaces.go b/pkg/rhttp/datatx/manager/spaces/spaces.go index c2373f9a6e..cc67515a7f 100644 --- a/pkg/rhttp/datatx/manager/spaces/spaces.go +++ b/pkg/rhttp/datatx/manager/spaces/spaces.go @@ -26,6 +26,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/rhttp/datatx" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" @@ -43,7 +44,8 @@ func init() { type config struct{} type manager struct { - conf *config + conf *config + publisher events.Publisher } func parseConfig(m map[string]interface{}) (*config, error) { @@ -56,13 +58,16 @@ func parseConfig(m map[string]interface{}) (*config, error) { } // New returns a datatx manager implementation that relies on HTTP PUT/GET. -func New(m map[string]interface{}) (datatx.DataTX, error) { +func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, error) { c, err := parseConfig(m) if err != nil { return nil, err } - return &manager{conf: c}, nil + return &manager{ + conf: c, + publisher: publisher, + }, nil } func (m *manager) Handler(fs storage.FS) (http.Handler, error) { diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index f70d07d585..201066d415 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -20,14 +20,21 @@ package tus import ( "net/http" + "path/filepath" "github.com/pkg/errors" + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/rhttp/datatx" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" "github.com/cs3org/reva/v2/pkg/storage" + "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/utils" "github.com/mitchellh/mapstructure" tusd "github.com/tus/tusd/pkg/handler" ) @@ -39,7 +46,8 @@ func init() { type config struct{} type manager struct { - conf *config + conf *config + publisher events.Publisher } func parseConfig(m map[string]interface{}) (*config, error) { @@ -52,13 +60,16 @@ func parseConfig(m map[string]interface{}) (*config, error) { } // New returns a datatx manager implementation that relies on HTTP PUT/GET. -func New(m map[string]interface{}) (datatx.DataTX, error) { +func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, error) { c, err := parseConfig(m) if err != nil { return nil, err } - return &manager{conf: c}, nil + return &manager{ + conf: c, + publisher: publisher, + }, nil } func (m *manager) Handler(fs storage.FS) (http.Handler, error) { @@ -67,6 +78,8 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, errtypes.NotSupported("file system does not support the tus protocol") } + sublog := appctx.GetLogger(ctx).With().Str("datatx", "spaces").Str("space", spaceID).Logger() + // A storage backend for tusd may consist of multiple different parts which // handle upload creation, locking, termination and so on. The composer is a // place where all those separated pieces are joined together. In this example @@ -77,7 +90,8 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { composable.UseIn(composer) config := tusd.Config{ - StoreComposer: composer, + StoreComposer: composer, + NotifyCompleteUploads: true, } handler, err := tusd.NewUnroutedHandler(config) @@ -85,6 +99,35 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, err } + if m.publisher != nil { + go func() { + for { + ev := <-handler.CompleteUploads + + u := ev.Upload + owner := &userv1beta1.UserId{ + Idp: u.Storage["Idp"], + OpaqueId: u.Storage["UserId"], + } + uploadedEv := events.FileUploaded{ + Owner: owner, + Executant: owner, + Ref: &providerv1beta1.Reference{ + ResourceId: &providerv1beta1.ResourceId{ + StorageId: storagespace.FormatStorageID(u.MetaData["providerID"], u.Storage["SpaceRoot"]), + OpaqueId: u.Storage["SpaceRoot"], + }, + Path: utils.MakeRelativePath(filepath.Join(u.MetaData["dir"], u.MetaData["filename"])), + }, + } + + if err := events.Publish(m.publisher, uploadedEv); err != nil { + sublog.Error().Err(err).Msg("failed to publish FileUploaded event") + } + } + }() + } + h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { method := r.Method From 543933441d593014de20fae91bad7af1ac157d09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 17 May 2022 12:59:12 +0200 Subject: [PATCH 03/11] Add a callback to Upload to indicate when an upload has finished The dataprovider uses this callback to emit the FileUploaded event which will be picked up by the search indexer, for example. --- pkg/rhttp/datatx/datatx.go | 17 ++++++++++++ pkg/rhttp/datatx/manager/simple/simple.go | 13 ++++++--- pkg/rhttp/datatx/manager/spaces/spaces.go | 7 ++++- pkg/rhttp/datatx/manager/tus/tus.go | 33 +++++++++-------------- pkg/storage/fs/nextcloud/nextcloud.go | 2 +- pkg/storage/fs/owncloudsql/upload.go | 3 ++- pkg/storage/fs/s3/s3.go | 2 +- pkg/storage/storage.go | 6 ++++- pkg/storage/utils/decomposedfs/upload.go | 26 ++++++++++++++++-- pkg/storage/utils/eosfs/upload.go | 3 ++- pkg/storage/utils/localfs/upload.go | 3 ++- 11 files changed, 82 insertions(+), 33 deletions(-) diff --git a/pkg/rhttp/datatx/datatx.go b/pkg/rhttp/datatx/datatx.go index 6e837bb8c2..02de6e227b 100644 --- a/pkg/rhttp/datatx/datatx.go +++ b/pkg/rhttp/datatx/datatx.go @@ -23,6 +23,9 @@ package datatx import ( "net/http" + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" ) @@ -30,3 +33,17 @@ import ( type DataTX interface { Handler(fs storage.FS) (http.Handler, error) } + +func EmitFileUploadedEvent(owner *userv1beta1.UserId, ref *provider.Reference, publisher events.Publisher) error { + if ref == nil { + return nil + } + + uploadedEv := events.FileUploaded{ + Owner: owner, + Executant: owner, + Ref: ref, + } + + return events.Publish(publisher, uploadedEv) +} diff --git a/pkg/rhttp/datatx/manager/simple/simple.go b/pkg/rhttp/datatx/manager/simple/simple.go index c603391a87..16f196cd97 100644 --- a/pkg/rhttp/datatx/manager/simple/simple.go +++ b/pkg/rhttp/datatx/manager/simple/simple.go @@ -21,6 +21,10 @@ package simple import ( "net/http" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" @@ -29,8 +33,6 @@ import ( "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" "github.com/cs3org/reva/v2/pkg/storage" - "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" ) func init() { @@ -79,8 +81,11 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { defer r.Body.Close() ref := &provider.Reference{Path: fn} - - err := fs.Upload(ctx, ref, r.Body) + err := fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) { + if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil { + sublog.Error().Err(err).Msg("failed to publish FileUploaded event") + } + }) switch v := err.(type) { case nil: w.WriteHeader(http.StatusOK) diff --git a/pkg/rhttp/datatx/manager/spaces/spaces.go b/pkg/rhttp/datatx/manager/spaces/spaces.go index cc67515a7f..6245b66777 100644 --- a/pkg/rhttp/datatx/manager/spaces/spaces.go +++ b/pkg/rhttp/datatx/manager/spaces/spaces.go @@ -23,6 +23,7 @@ import ( "path" "strings" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" @@ -91,7 +92,11 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { ResourceId: &provider.ResourceId{StorageId: storageid, OpaqueId: opaqeid}, Path: fn, } - err := fs.Upload(ctx, ref, r.Body) + err := fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) { + if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil { + sublog.Error().Err(err).Msg("failed to publish FileUploaded event") + } + }) switch v := err.(type) { case nil: w.WriteHeader(http.StatusOK) diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index 201066d415..fbd5ba7e8d 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -19,13 +19,15 @@ package tus import ( + "context" "net/http" "path/filepath" "github.com/pkg/errors" + tusd "github.com/tus/tusd/pkg/handler" userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" - providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" @@ -36,7 +38,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/mitchellh/mapstructure" - tusd "github.com/tus/tusd/pkg/handler" ) func init() { @@ -78,8 +79,6 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, errtypes.NotSupported("file system does not support the tus protocol") } - sublog := appctx.GetLogger(ctx).With().Str("datatx", "spaces").Str("space", spaceID).Logger() - // A storage backend for tusd may consist of multiple different parts which // handle upload creation, locking, termination and so on. The composer is a // place where all those separated pieces are joined together. In this example @@ -103,26 +102,20 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { go func() { for { ev := <-handler.CompleteUploads - - u := ev.Upload + info := ev.Upload owner := &userv1beta1.UserId{ - Idp: u.Storage["Idp"], - OpaqueId: u.Storage["UserId"], + Idp: info.Storage["Idp"], + OpaqueId: info.Storage["UserId"], } - uploadedEv := events.FileUploaded{ - Owner: owner, - Executant: owner, - Ref: &providerv1beta1.Reference{ - ResourceId: &providerv1beta1.ResourceId{ - StorageId: storagespace.FormatStorageID(u.MetaData["providerID"], u.Storage["SpaceRoot"]), - OpaqueId: u.Storage["SpaceRoot"], - }, - Path: utils.MakeRelativePath(filepath.Join(u.MetaData["dir"], u.MetaData["filename"])), + ref := &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: storagespace.FormatStorageID(info.MetaData["providerID"], info.Storage["SpaceRoot"]), + OpaqueId: info.Storage["SpaceRoot"], }, + Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), } - - if err := events.Publish(m.publisher, uploadedEv); err != nil { - sublog.Error().Err(err).Msg("failed to publish FileUploaded event") + if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil { + appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") } } }() diff --git a/pkg/storage/fs/nextcloud/nextcloud.go b/pkg/storage/fs/nextcloud/nextcloud.go index ed994bb0c5..94cf4be949 100644 --- a/pkg/storage/fs/nextcloud/nextcloud.go +++ b/pkg/storage/fs/nextcloud/nextcloud.go @@ -395,7 +395,7 @@ func (nc *StorageDriver) InitiateUpload(ctx context.Context, ref *provider.Refer } // Upload as defined in the storage.FS interface -func (nc *StorageDriver) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { +func (nc *StorageDriver) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { return nc.doUpload(ctx, ref.Path, r) } diff --git a/pkg/storage/fs/owncloudsql/upload.go b/pkg/storage/fs/owncloudsql/upload.go index ac06e853d2..47bcd16af6 100644 --- a/pkg/storage/fs/owncloudsql/upload.go +++ b/pkg/storage/fs/owncloudsql/upload.go @@ -37,6 +37,7 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/mime" + "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/templates" "github.com/google/uuid" @@ -47,7 +48,7 @@ import ( var defaultFilePerm = os.FileMode(0664) -func (fs *owncloudsqlfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { +func (fs *owncloudsqlfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { return errors.Wrap(err, "owncloudsql: error retrieving upload") diff --git a/pkg/storage/fs/s3/s3.go b/pkg/storage/fs/s3/s3.go index 53ac69e805..a174003bea 100644 --- a/pkg/storage/fs/s3/s3.go +++ b/pkg/storage/fs/s3/s3.go @@ -615,7 +615,7 @@ func (fs *s3FS) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys return finfos, nil } -func (fs *s3FS) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { +func (fs *s3FS) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { log := appctx.GetLogger(ctx) fn, err := fs.resolve(ctx, ref) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 78d8791075..98068b715c 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -23,10 +23,14 @@ import ( "io" "net/url" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" ) +// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished +type UploadFinishedFunc func(owner *userpb.UserId, ref *provider.Reference) + // FS is the interface to implement access to the storage. type FS interface { GetHome(ctx context.Context) (string, error) @@ -38,7 +42,7 @@ type FS interface { GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (*provider.ResourceInfo, error) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) ([]*provider.ResourceInfo, error) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) - Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error + Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uploadFunc UploadFinishedFunc) error Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) ListRevisions(ctx context.Context, ref *provider.Reference) ([]*provider.FileVersion, error) DownloadRevision(ctx context.Context, ref *provider.Reference, key string) (io.ReadCloser, error) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index a37eb8de02..b469f7b9d4 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -41,10 +41,12 @@ import ( ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/logger" + "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs" + "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" @@ -57,7 +59,7 @@ var defaultFilePerm = os.FileMode(0664) // Upload uploads data to the given resource // TODO Upload (and InitiateUpload) needs a way to receive the expected checksum. // Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? -func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) (err error) { +func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) error { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { return errors.Wrap(err, "Decomposedfs: error retrieving upload") @@ -92,7 +94,27 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i return errors.Wrap(err, "Decomposedfs: error writing to binary file") } - return uploadInfo.FinishUpload(ctx) + if err := uploadInfo.FinishUpload(ctx); err != nil { + return errors.Wrap(err, "Decomposedfs: error finishing upload") + } + + if uff != nil { + info := uploadInfo.info + uploadRef := &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: storagespace.FormatStorageID(info.MetaData["providerID"], info.Storage["SpaceRoot"]), + OpaqueId: info.Storage["SpaceRoot"], + }, + Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), + } + owner, ok := ctxpkg.ContextGetUser(uploadInfo.ctx) + if !ok { + return errtypes.PreconditionFailed("error getting user from uploadinfo context") + } + uff(owner.Id, uploadRef) + } + + return nil } // InitiateUpload returns upload ids corresponding to different protocols it supports diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index 9bae497ac0..71d498a6d7 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -26,11 +26,12 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/pkg/errors" ) -func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { +func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { p, err := fs.resolve(ctx, ref) if err != nil { return errors.Wrap(err, "eos: error resolving reference") diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index 978faef368..fb6eac62fe 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -31,6 +31,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" @@ -40,7 +41,7 @@ import ( var defaultFilePerm = os.FileMode(0664) -func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error { +func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { return errors.Wrap(err, "localfs: error retrieving upload") From 9ed9350cc5ec0059c137b86d9ef6cc3877044c85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 18 May 2022 09:21:53 +0200 Subject: [PATCH 04/11] Fix tests --- pkg/storage/fs/nextcloud/nextcloud_test.go | 2 +- pkg/storage/utils/decomposedfs/upload_test.go | 14 +++++++------- tests/helpers/helpers.go | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/storage/fs/nextcloud/nextcloud_test.go b/pkg/storage/fs/nextcloud/nextcloud_test.go index c65269cd93..4852eb3122 100644 --- a/pkg/storage/fs/nextcloud/nextcloud_test.go +++ b/pkg/storage/fs/nextcloud/nextcloud_test.go @@ -447,7 +447,7 @@ var _ = Describe("Nextcloud", func() { } stringReader := strings.NewReader("shiny!") stringReadCloser := io.NopCloser(stringReader) - err := nc.Upload(ctx, ref, stringReadCloser) + err := nc.Upload(ctx, ref, stringReadCloser, nil) Expect(err).ToNot(HaveOccurred()) checkCalled(called, `PUT /apps/sciencemesh/~tester/api/storage/Upload/some/file/path.txt shiny!`) }) diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 842410fb8f..0d13917e57 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -231,7 +231,7 @@ var _ = Describe("File uploads", func() { uploadRef := &provider.Reference{Path: "/" + uploadIds["simple"]} - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("*os.File")). + bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("*os.File"), mock.Anything). Return(nil). Run(func(args mock.Arguments) { reader := args.Get(1).(io.Reader) @@ -241,10 +241,10 @@ var _ = Describe("File uploads", func() { Expect(data).To(Equal([]byte("0123456789"))) }) - err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent))) + err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) Expect(err).ToNot(HaveOccurred()) - bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything) + bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything, mock.Anything) resources, err := fs.ListFolder(ctx, rootRef, []string{}) @@ -269,7 +269,7 @@ var _ = Describe("File uploads", func() { uploadRef := &provider.Reference{Path: "/" + uploadIds["simple"]} - bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("*os.File")). + bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("*os.File"), mock.Anything). Return(nil). Run(func(args mock.Arguments) { reader := args.Get(1).(io.Reader) @@ -279,10 +279,10 @@ var _ = Describe("File uploads", func() { Expect(data).To(Equal([]byte(""))) }) - err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent))) + err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) Expect(err).ToNot(HaveOccurred()) - bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything) + bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything, mock.Anything) resources, err := fs.ListFolder(ctx, rootRef, []string{}) @@ -299,7 +299,7 @@ var _ = Describe("File uploads", func() { ) uploadRef := &provider.Reference{Path: "/some-non-existent-upload-reference"} - err := fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent))) + err := fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) Expect(err).To(HaveOccurred()) diff --git a/tests/helpers/helpers.go b/tests/helpers/helpers.go index b1516f02f4..b4f5c7d3e1 100644 --- a/tests/helpers/helpers.go +++ b/tests/helpers/helpers.go @@ -62,6 +62,6 @@ func Upload(ctx context.Context, fs storage.FS, ref *provider.Reference, content return errors.New("simple upload method not available") } uploadRef := &provider.Reference{Path: "/" + uploadID} - err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(content))) + err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(content)), nil) return err } From 18fa110f1d2b013bf5509475d37198e9290fd5b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 18 May 2022 11:00:18 +0200 Subject: [PATCH 05/11] Add changelog --- changelog/unreleased/fix-file-uploaded-event.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/fix-file-uploaded-event.md diff --git a/changelog/unreleased/fix-file-uploaded-event.md b/changelog/unreleased/fix-file-uploaded-event.md new file mode 100644 index 0000000000..4000687bda --- /dev/null +++ b/changelog/unreleased/fix-file-uploaded-event.md @@ -0,0 +1,5 @@ +Bugfix: Fix FileUploaded event being emitted too early + +We fixed a problem where the FileUploaded event was emitted before the upload had actually finished. + +https://github.com/cs3org/reva/pull/2882 From 5b1a66da00f4fcc11675f07488752d3365e53b84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 18 May 2022 11:05:31 +0200 Subject: [PATCH 06/11] Make hound happy --- pkg/rhttp/datatx/datatx.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/rhttp/datatx/datatx.go b/pkg/rhttp/datatx/datatx.go index 02de6e227b..980e7ce8c3 100644 --- a/pkg/rhttp/datatx/datatx.go +++ b/pkg/rhttp/datatx/datatx.go @@ -34,6 +34,7 @@ type DataTX interface { Handler(fs storage.FS) (http.Handler, error) } +// EmitFileUploadedEvent is a helper function which publishes a FileUploaded event func EmitFileUploadedEvent(owner *userv1beta1.UserId, ref *provider.Reference, publisher events.Publisher) error { if ref == nil { return nil From 72d4216700f6829dc87901001a7d0f9e1dfdc187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 18 May 2022 11:15:50 +0200 Subject: [PATCH 07/11] Do not try to publish on a nil publisher --- pkg/rhttp/datatx/datatx.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rhttp/datatx/datatx.go b/pkg/rhttp/datatx/datatx.go index 980e7ce8c3..ba8897a15c 100644 --- a/pkg/rhttp/datatx/datatx.go +++ b/pkg/rhttp/datatx/datatx.go @@ -36,7 +36,7 @@ type DataTX interface { // EmitFileUploadedEvent is a helper function which publishes a FileUploaded event func EmitFileUploadedEvent(owner *userv1beta1.UserId, ref *provider.Reference, publisher events.Publisher) error { - if ref == nil { + if ref == nil || publisher == nil { return nil } From 85c720459c3ae36f16eaa9e9a8888330814638db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 18 May 2022 11:24:07 +0200 Subject: [PATCH 08/11] Fix nextcloud integration tests --- pkg/storage/fs/nextcloud/nextcloud_server_mock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/fs/nextcloud/nextcloud_server_mock.go b/pkg/storage/fs/nextcloud/nextcloud_server_mock.go index 06b71528f4..dcfe249960 100644 --- a/pkg/storage/fs/nextcloud/nextcloud_server_mock.go +++ b/pkg/storage/fs/nextcloud/nextcloud_server_mock.go @@ -111,7 +111,7 @@ var responses = map[string]Response{ `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/GetPathByID {"storage_id":"00000000-0000-0000-0000-000000000000","opaque_id":"fileid-/some/path"} EMPTY`: {200, "/subdir", serverStateEmpty}, - `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"path":"/file"},"uploadLength":0,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty}, + `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"path":"/file"},"uploadLength":0,"metadata":{"providerID":""}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty}, `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":0,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty}, `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/ListFolder {"ref":{"path":"/"},"mdKeys":null}`: {200, `[{"opaque":{},"type":2,"id":{"opaque_id":"fileid-/subdir"},"checksum":{},"etag":"deadbeef","mime_type":"text/plain","mtime":{"seconds":1234567890},"path":"/subdir","permission_set":{},"size":12345,"canonical_metadata":{},"owner":{"opaque_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"arbitrary_metadata":{"metadata":{"da":"ta","some":"arbi","trary":"meta"}}}]`, serverStateEmpty}, From 2dc6ca9b8736fecdaa28c40d58e20ff1ae7ffac8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 18 May 2022 11:56:38 +0200 Subject: [PATCH 09/11] Do not wrap the errors from FinishUpload --- pkg/storage/utils/decomposedfs/upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index b469f7b9d4..5803a1d5f6 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -95,7 +95,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i } if err := uploadInfo.FinishUpload(ctx); err != nil { - return errors.Wrap(err, "Decomposedfs: error finishing upload") + return err } if uff != nil { From 7cc20b251baed53c04b710620a01ea1ca7802562 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 18 May 2022 12:47:00 +0200 Subject: [PATCH 10/11] Log a warning if the nats configuration is missing --- internal/http/services/dataprovider/dataprovider.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index a9dca3258c..bdf76de63e 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -82,7 +82,9 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) var publisher events.Publisher - if conf.NatsAddress != "" && conf.NatsClusterID != "" { + if conf.NatsAddress == "" || conf.NatsClusterID == "" { + log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.") + } else { publisher, err = server.NewNatsStream(natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID)) if err != nil { return nil, err From 73622dc82dc765f4f2b13ec548e7cb367b5b9624 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 18 May 2022 17:09:01 +0200 Subject: [PATCH 11/11] Handle nil publisher --- pkg/rhttp/datatx/manager/tus/tus.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index fbd5ba7e8d..0a55099394 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -88,9 +88,10 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { // let the composable storage tell tus which extensions it supports composable.UseIn(composer) + publishEvents := m.publisher != nil config := tusd.Config{ StoreComposer: composer, - NotifyCompleteUploads: true, + NotifyCompleteUploads: publishEvents, } handler, err := tusd.NewUnroutedHandler(config) @@ -98,7 +99,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, err } - if m.publisher != nil { + if publishEvents { go func() { for { ev := <-handler.CompleteUploads @@ -122,7 +123,6 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { } h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - method := r.Method // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override if r.Header.Get("X-HTTP-Method-Override") != "" {