Skip to content

Commit

Permalink
Merge pull request #10476 from owncloud/concurrent-autoaccept
Browse files Browse the repository at this point in the history
concurrent autoaccept
  • Loading branch information
butonic authored Nov 6, 2024
2 parents 28e8a84 + 42711ae commit af806dc
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 37 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/concurrent-autoaccept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Concurrent autoaccept for shares

Shares for groups are now concurrently accepted. Tha default of 25 goroutinges can be changed with the new `FRONTEND_MAX_CONCURRENCY` environment variable.

https://github.com/owncloud/ocis/pull/10476
91 changes: 57 additions & 34 deletions services/frontend/pkg/command/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ package command
import (
"context"
"errors"
"sync"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
Expand All @@ -17,16 +23,9 @@ import (
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/frontend/pkg/config"
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"

settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
)

var _registeredEvents = []events.Unmarshaller{
Expand Down Expand Up @@ -90,7 +89,7 @@ func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) erro
default:
l.Error().Interface("event", e).Msg("unhandled event")
case events.ShareCreated:
AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount)
AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount, cfg.MaxConcurrency)
}
case <-ctx.Done():
l.Info().Msg("context cancelled")
Expand All @@ -100,7 +99,7 @@ func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) erro
}

// AutoAcceptShares automatically accepts shares if configured by the admin or user
func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], vs settingssvc.ValueService, cfg config.ServiceAccount) {
func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], vs settingssvc.ValueService, cfg config.ServiceAccount, maxConcurrency int) {
gwc, err := gatewaySelector.Next()
if err != nil {
l.Error().Err(err).Msg("cannot get gateway client")
Expand All @@ -118,36 +117,60 @@ func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logg
return
}

for _, uid := range userIDs {
if !autoAcceptShares(ctx, uid, autoAcceptDefault, vs) {
continue
}
work := make(chan *user.UserId, len(userIDs))

gwc, err := gatewaySelector.Next()
if err != nil {
l.Error().Err(err).Msg("cannot get gateway client")
continue
}
resp, err := gwc.UpdateReceivedShare(ctx, updateShareRequest(ev.ShareID, uid))
if err != nil {
l.Error().Err(err).Msg("error sending grpc request")
continue
// Distribute work
go func() {
defer close(work)
for _, userID := range userIDs {
select {
case work <- userID:
case <-ctx.Done():
return
}
}
}()

wg := sync.WaitGroup{}
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for userID := range work {

if code := resp.GetStatus().GetCode(); code != rpc.Code_CODE_OK {
// log unexpected status codes if a share cannot be accepted...
func() *zerolog.Event {
switch code {
// ... not found is not an error in the context of auto-accepting shares
case rpc.Code_CODE_NOT_FOUND:
return l.Debug()
default:
return l.Error()
if !autoAcceptShares(ctx, userID, autoAcceptDefault, vs) {
continue
}
}().Interface("status", resp.GetStatus()).Str("userid", uid.GetOpaqueId()).Msg("unexpected status code while accepting share")
}

gwc, err := gatewaySelector.Next()
if err != nil {
l.Error().Err(err).Msg("cannot get gateway client")
continue
}
resp, err := gwc.UpdateReceivedShare(ctx, updateShareRequest(ev.ShareID, userID))
if err != nil {
l.Error().Err(err).Msg("error sending grpc request")
continue
}

if code := resp.GetStatus().GetCode(); code != rpc.Code_CODE_OK {
// log unexpected status codes if a share cannot be accepted...
func() *zerolog.Event {
switch code {
// ... not found is not an error in the context of auto-accepting shares
case rpc.Code_CODE_NOT_FOUND:
return l.Debug()
default:
return l.Error()
}
}().Interface("status", resp.GetStatus()).Str("userid", userID.GetOpaqueId()).Msg("unexpected status code while accepting share")
}
}
}()
}

// Wait for all goroutines to finish
wg.Wait()
}

func getUserIDs(ctx context.Context, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], uid *user.UserId, gid *group.GroupId) ([]*user.UserId, error) {
Expand Down
4 changes: 2 additions & 2 deletions services/frontend/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type Config struct {
DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE;FRONTEND_DISABLE_SSE" desc:"When set to true, clients are informed that the Server-Sent Events endpoint is not accessible." introductionVersion:"pre5.0"`
DefaultLinkPermissions int `yaml:"default_link_permissions" env:"FRONTEND_DEFAULT_LINK_PERMISSIONS" desc:"Defines the default permissions a link is being created with. Possible values are 0 (= internal link, for instance members only) and 1 (= public link with viewer permissions). Defaults to 1." introductionVersion:"5.0"`

PublicURL string `yaml:"public_url" env:"OCIS_URL;FRONTEND_PUBLIC_URL" desc:"The public facing URL of the oCIS frontend." introductionVersion:"pre5.0"`

PublicURL string `yaml:"public_url" env:"OCIS_URL;FRONTEND_PUBLIC_URL" desc:"The public facing URL of the oCIS frontend." introductionVersion:"pre5.0"`
MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;FRONTEND_MAX_CONCURRENCY" desc:"Maximum number of concurrent go-routines. Higher values can potentially get work done faster but will also cause more load on the system. Values of 0 or below will be ignored and the default value will be used." introductionVersion:"7.0.0"`
AppHandler AppHandler `yaml:"app_handler"`
Archiver Archiver `yaml:"archiver"`
DataGateway DataGateway `yaml:"data_gateway"`
Expand Down
5 changes: 4 additions & 1 deletion services/frontend/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func DefaultConfig() *config.Config {
Cluster: "ocis-cluster",
EnableTLS: false,
},
MaxConcurrency: 25,
PasswordPolicy: config.PasswordPolicy{
MinCharacters: 8,
MinLowerCaseCharacters: 1,
Expand Down Expand Up @@ -195,5 +196,7 @@ func EnsureDefaults(cfg *config.Config) {

// Sanitize sanitized the configuration
func Sanitize(cfg *config.Config) {
// nothing to sanitize here atm
if cfg.MaxConcurrency <= 0 {
cfg.MaxConcurrency = 5
}
}

0 comments on commit af806dc

Please sign in to comment.