Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File uploaded event in dataprovider #2882

Merged
merged 11 commits into from
May 18, 2022
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-file-uploaded-event.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Fix FileUploaded event being emitted too early

We fixed a problem where the FileUploaded event was emitted before the upload had actually finished.

https://github.com/cs3org/reva/pull/2882
4 changes: 0 additions & 4 deletions internal/grpc/interceptors/eventsmiddleware/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error
if isSuccess(v) {
ev = ContainerCreated(v, req.(*provider.CreateContainerRequest), executantID)
}
case *provider.InitiateFileUploadResponse:
if isSuccess(v) {
ev = FileUploaded(v, req.(*provider.InitiateFileUploadRequest), executantID)
}
case *provider.InitiateFileDownloadResponse:
if isSuccess(v) {
ev = FileDownloaded(v, req.(*provider.InitiateFileDownloadRequest))
Expand Down
4 changes: 4 additions & 0 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,10 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value)
}
}

// pass on the provider it to be persisted with the upload info. that is required to correlate the upload with the proper provider later on
metadata["providerID"] = s.conf.MountID

uploadIDs, err := s.storage.InitiateUpload(ctx, req.Ref, uploadLength, metadata)
if err != nil {
var st *rpc.Status
Expand Down
32 changes: 23 additions & 9 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"net/http"

"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/server"
datatxregistry "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/v2/pkg/rhttp/global"
"github.com/cs3org/reva/v2/pkg/rhttp/router"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/mitchellh/mapstructure"
"github.com/rs/zerolog"
)
Expand All @@ -37,12 +40,14 @@ func init() {
}

type config struct {
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
DataTXs map[string]map[string]interface{} `mapstructure:"data_txs" docs:"url:pkg/rhttp/datatx/manager/simple/simple.go;The configuration for the data tx protocols"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
DataTXs map[string]map[string]interface{} `mapstructure:"data_txs" docs:"url:pkg/rhttp/datatx/manager/simple/simple.go;The configuration for the data tx protocols"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
NatsAddress string `mapstructure:"nats_address"`
NatsClusterID string `mapstructure:"nats_clusterID"`
}

func (c *config) init() {
Expand Down Expand Up @@ -75,7 +80,16 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
return nil, err
}

dataTXs, err := getDataTXs(conf, fs)
var publisher events.Publisher

if conf.NatsAddress != "" && conf.NatsClusterID != "" {
aduffeck marked this conversation as resolved.
Show resolved Hide resolved
publisher, err = server.NewNatsStream(natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID))
if err != nil {
return nil, err
}
}

dataTXs, err := getDataTXs(conf, fs, publisher)
if err != nil {
return nil, err
}
Expand All @@ -97,7 +111,7 @@ func getFS(c *config) (storage.FS, error) {
return nil, fmt.Errorf("driver not found: %s", c.Driver)
}

func getDataTXs(c *config, fs storage.FS) (map[string]http.Handler, error) {
func getDataTXs(c *config, fs storage.FS, publisher events.Publisher) (map[string]http.Handler, error) {
if c.DataTXs == nil {
c.DataTXs = make(map[string]map[string]interface{})
}
Expand All @@ -110,7 +124,7 @@ func getDataTXs(c *config, fs storage.FS) (map[string]http.Handler, error) {
txs := make(map[string]http.Handler)
for t := range c.DataTXs {
if f, ok := datatxregistry.NewFuncs[t]; ok {
if tx, err := f(c.DataTXs[t]); err == nil {
if tx, err := f(c.DataTXs[t], publisher); err == nil {
if handler, err := tx.Handler(fs); err == nil {
txs[t] = handler
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/rhttp/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,28 @@ package datatx
import (
"net/http"

userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
)

// DataTX provides an abstraction around various data transfer protocols.
type DataTX interface {
Handler(fs storage.FS) (http.Handler, error)
}

// EmitFileUploadedEvent is a helper function which publishes a FileUploaded event
func EmitFileUploadedEvent(owner *userv1beta1.UserId, ref *provider.Reference, publisher events.Publisher) error {
if ref == nil {
return nil
}

uploadedEv := events.FileUploaded{
Owner: owner,
Executant: owner,
Ref: ref,
}

return events.Publish(publisher, uploadedEv)
}
7 changes: 5 additions & 2 deletions pkg/rhttp/datatx/manager/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package registry

import "github.com/cs3org/reva/v2/pkg/rhttp/datatx"
import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx"
)

// NewFunc is the function that data transfer implementations
// should register at init time.
type NewFunc func(map[string]interface{}) (datatx.DataTX, error)
type NewFunc func(map[string]interface{}, events.Publisher) (datatx.DataTX, error)

// NewFuncs is a map containing all the registered data transfers.
var NewFuncs = map[string]NewFunc{}
Expand Down
24 changes: 17 additions & 7 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ package simple
import (
"net/http"

"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

func init() {
Expand All @@ -39,7 +42,8 @@ func init() {
type config struct{}

type manager struct {
conf *config
conf *config
publisher events.Publisher
}

func parseConfig(m map[string]interface{}) (*config, error) {
Expand All @@ -52,13 +56,16 @@ func parseConfig(m map[string]interface{}) (*config, error) {
}

// New returns a datatx manager implementation that relies on HTTP PUT/GET.
func New(m map[string]interface{}) (datatx.DataTX, error) {
func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
}

return &manager{conf: c}, nil
return &manager{
conf: c,
publisher: publisher,
}, nil
}

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
Expand All @@ -74,8 +81,11 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
defer r.Body.Close()

ref := &provider.Reference{Path: fn}

err := fs.Upload(ctx, ref, r.Body)
err := fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) {
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
sublog.Error().Err(err).Msg("failed to publish FileUploaded event")
}
})
switch v := err.(type) {
case nil:
w.WriteHeader(http.StatusOK)
Expand Down
18 changes: 14 additions & 4 deletions pkg/rhttp/datatx/manager/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"path"
"strings"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
Expand All @@ -43,7 +45,8 @@ func init() {
type config struct{}

type manager struct {
conf *config
conf *config
publisher events.Publisher
}

func parseConfig(m map[string]interface{}) (*config, error) {
Expand All @@ -56,13 +59,16 @@ func parseConfig(m map[string]interface{}) (*config, error) {
}

// New returns a datatx manager implementation that relies on HTTP PUT/GET.
func New(m map[string]interface{}) (datatx.DataTX, error) {
func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
}

return &manager{conf: c}, nil
return &manager{
conf: c,
publisher: publisher,
}, nil
}

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
Expand All @@ -86,7 +92,11 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
ResourceId: &provider.ResourceId{StorageId: storageid, OpaqueId: opaqeid},
Path: fn,
}
err := fs.Upload(ctx, ref, r.Body)
err := fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) {
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
sublog.Error().Err(err).Msg("failed to publish FileUploaded event")
}
})
switch v := err.(type) {
case nil:
w.WriteHeader(http.StatusOK)
Expand Down
46 changes: 41 additions & 5 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package tus

import (
"context"
"net/http"
"path/filepath"

"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"

userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
tusd "github.com/tus/tusd/pkg/handler"
)

func init() {
Expand All @@ -39,7 +47,8 @@ func init() {
type config struct{}

type manager struct {
conf *config
conf *config
publisher events.Publisher
}

func parseConfig(m map[string]interface{}) (*config, error) {
Expand All @@ -52,13 +61,16 @@ func parseConfig(m map[string]interface{}) (*config, error) {
}

// New returns a datatx manager implementation that relies on HTTP PUT/GET.
func New(m map[string]interface{}) (datatx.DataTX, error) {
func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
}

return &manager{conf: c}, nil
return &manager{
conf: c,
publisher: publisher,
}, nil
}

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
Expand All @@ -77,14 +89,38 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
composable.UseIn(composer)

config := tusd.Config{
StoreComposer: composer,
StoreComposer: composer,
NotifyCompleteUploads: true,
}

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return nil, err
}

if m.publisher != nil {
go func() {
for {
ev := <-handler.CompleteUploads
info := ev.Upload
owner := &userv1beta1.UserId{
Idp: info.Storage["Idp"],
OpaqueId: info.Storage["UserId"],
}
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: storagespace.FormatStorageID(info.MetaData["providerID"], info.Storage["SpaceRoot"]),
OpaqueId: info.Storage["SpaceRoot"],
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
}
if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
}()
}

h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

method := r.Method
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/fs/cephfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, u
}

if metadata != nil {
info.MetaData["providerID"] = metadata["providerID"]
if metadata["mtime"] != "" {
info.MetaData["mtime"] = metadata["mtime"]
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/fs/nextcloud/nextcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (nc *StorageDriver) InitiateUpload(ctx context.Context, ref *provider.Refer
}

// Upload as defined in the storage.FS interface
func (nc *StorageDriver) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error {
func (nc *StorageDriver) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error {
return nc.doUpload(ctx, ref.Path, r)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/fs/nextcloud/nextcloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ var _ = Describe("Nextcloud", func() {
}
stringReader := strings.NewReader("shiny!")
stringReadCloser := io.NopCloser(stringReader)
err := nc.Upload(ctx, ref, stringReadCloser)
err := nc.Upload(ctx, ref, stringReadCloser, nil)
Expect(err).ToNot(HaveOccurred())
checkCalled(called, `PUT /apps/sciencemesh/~tester/api/storage/Upload/some/file/path.txt shiny!`)
})
Expand Down
Loading