Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Performance tuning #3932

Draft
wants to merge 18 commits into
base: edge
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .bingo/go.mod
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
module _ // Fake go.mod auto-created by 'bingo' for go -moddir compatibility with non-Go projects. Commit this file, together with other .mod files.
module _ // Fake go.mod auto-created by 'bingo' for go -moddir compatibility with non-Go projects. Commit this file, together with other .mod files.

go 1.20
2 changes: 1 addition & 1 deletion cmd/revad/runtime/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package runtime

import (
"github.com/cs3org/reva/v2/pkg/registry"
"github.com/rs/zerolog"
"go-micro.dev/v4/registry"
)

// Option defines a single option function.
Expand Down
18 changes: 3 additions & 15 deletions cmd/revad/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import (

"github.com/cs3org/reva/v2/cmd/revad/internal/grace"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/registry/memory"
"github.com/cs3org/reva/v2/pkg/registry"
"github.com/cs3org/reva/v2/pkg/rgrpc"
"github.com/cs3org/reva/v2/pkg/rhttp"
"github.com/cs3org/reva/v2/pkg/sharedconf"
rtrace "github.com/cs3org/reva/v2/pkg/trace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand All @@ -55,19 +54,8 @@ func RunWithOptions(mainConf map[string]interface{}, pidFile string, opts ...Opt
parseSharedConfOrDie(mainConf["shared"])
coreConf := parseCoreConfOrDie(mainConf["core"])

// TODO: one can pass the options from the config file to registry.New() and initialize a registry based upon config files.
if options.Registry != nil {
utils.GlobalRegistry = options.Registry
} else if _, ok := mainConf["registry"]; ok {
for _, services := range mainConf["registry"].(map[string]interface{}) {
for sName, nodes := range services.(map[string]interface{}) {
for _, instance := range nodes.([]interface{}) {
if err := utils.GlobalRegistry.Add(memory.NewService(sName, instance.(map[string]interface{})["nodes"].([]interface{}))); err != nil {
panic(err)
}
}
}
}
if err := registry.Init(options.Registry); err != nil {
panic(err)
}

run(mainConf, coreConf, options.Logger, pidFile)
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4
go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/exporters/jaeger v1.11.1
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1
go.opentelemetry.io/otel/sdk v1.11.1
go.opentelemetry.io/otel/trace v1.11.1
Expand All @@ -91,7 +90,6 @@ require (
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e
google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.1
gotest.tools v2.2.0+incompatible
)

require (
Expand Down Expand Up @@ -202,6 +200,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.mongodb.org/mongo-driver v1.10.3 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1593,8 +1593,6 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
2 changes: 1 addition & 1 deletion internal/grpc/interceptors/appctx/appctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/rs/zerolog"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
semconv "go.opentelemetry.io/otel/semconv/v1.16.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
Expand Down
15 changes: 10 additions & 5 deletions internal/grpc/interceptors/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/bluele/gcache"
authpb "github.com/cs3org/go-cs3apis/cs3/auth/provider/v1beta1"
gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/auth/scope"
Expand All @@ -38,7 +38,7 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
semconv "go.opentelemetry.io/otel/semconv/v1.16.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -273,11 +273,11 @@ func dismantleToken(ctx context.Context, tkn string, req interface{}, mgr token.
}

if sharedconf.SkipUserGroupsInToken() {
client, err := pool.GetGatewayServiceClient(gatewayAddr)
selector, err := pool.GatewaySelector(gatewayAddr)
if err != nil {
return nil, nil, err
}
groups, err := getUserGroups(ctx, u, client)
groups, err := getUserGroups(ctx, u, selector)
if err != nil {
return nil, nil, err
}
Expand All @@ -300,13 +300,18 @@ func dismantleToken(ctx context.Context, tkn string, req interface{}, mgr token.
return u, tokenScope, nil
}

func getUserGroups(ctx context.Context, u *userpb.User, client gatewayv1beta1.GatewayAPIClient) ([]string, error) {
func getUserGroups(ctx context.Context, u *userpb.User, selector pool.Selectable[gateway.GatewayAPIClient]) ([]string, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing ',' in parameter list

if groupsIf, err := userGroupsCache.Get(u.Id.OpaqueId); err == nil {
log := appctx.GetLogger(ctx)
log.Info().Str("userid", u.Id.OpaqueId).Msg("user groups found in cache")
return groupsIf.([]string), nil
}

client, err := selector.Next()
if err != nil {
return nil, err
}

res, err := client.GetUserGroups(ctx, &userpb.GetUserGroupsRequest{UserId: u.Id})
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling GetUserGroups")
Expand Down
48 changes: 33 additions & 15 deletions internal/grpc/interceptors/auth/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (

func expandAndVerifyScope(ctx context.Context, req interface{}, tokenScope map[string]*authpb.Scope, user *userpb.User, gatewayAddr string, mgr token.Manager) error {
log := appctx.GetLogger(ctx)
client, err := pool.GetGatewayServiceClient(gatewayAddr)
selector, err := pool.GatewaySelector(gatewayAddr)
if err != nil {
return err
}
Expand All @@ -70,24 +70,28 @@ func expandAndVerifyScope(ctx context.Context, req interface{}, tokenScope map[s
for k := range tokenScope {
switch {
case strings.HasPrefix(k, "publicshare"):
if err = resolvePublicShare(ctx, ref, tokenScope[k], client, mgr); err == nil {
if err = resolvePublicShare(ctx, ref, tokenScope[k], selector, mgr); err == nil {
return nil
}

case strings.HasPrefix(k, "share"):
if err = resolveUserShare(ctx, ref, tokenScope[k], client, mgr); err == nil {
if err = resolveUserShare(ctx, ref, tokenScope[k], selector, mgr); err == nil {
return nil
}

case strings.HasPrefix(k, "lightweight"):
if err = resolveLightweightScope(ctx, ref, tokenScope[k], user, client, mgr); err == nil {
if err = resolveLightweightScope(ctx, ref, tokenScope[k], user, selector, mgr); err == nil {
return nil
}
}
log.Err(err).Msgf("error resolving reference %s under scope %+v", ref.String(), k)
}

} else if ref, ok := extractShareRef(req); ok {
client, err := selector.Next()
if err != nil {
return err
}
// It's a share ref
// The request might be coming from a share created for a lightweight account
// after the token was minted.
Expand Down Expand Up @@ -124,13 +128,18 @@ func expandAndVerifyScope(ctx context.Context, req interface{}, tokenScope map[s
return errtypes.PermissionDenied(fmt.Sprintf("access to resource %+v not allowed within the assigned scope", req))
}

func resolveLightweightScope(ctx context.Context, ref *provider.Reference, scope *authpb.Scope, user *userpb.User, client gateway.GatewayAPIClient, mgr token.Manager) error {
func resolveLightweightScope(ctx context.Context, ref *provider.Reference, scope *authpb.Scope, user *userpb.User, selector pool.Selectable[gateway.GatewayAPIClient], mgr token.Manager) error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing ',' in parameter list (and 10 more errors)

// Check if this ref is cached
key := "lw:" + user.Id.OpaqueId + scopeDelimiter + getRefKey(ref)
if _, err := scopeExpansionCache.Get(key); err == nil {
return nil
}

client, err := selector.Next()
if err != nil {
return err
}

shares, err := client.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{})
if err != nil || shares.Status.Code != rpc.Code_CODE_OK {
return errtypes.InternalError("error listing received shares")
Expand All @@ -143,7 +152,7 @@ func resolveLightweightScope(ctx context.Context, ref *provider.Reference, scope
if ref.ResourceId != nil && utils.ResourceIDEqual(share.Share.ResourceId, ref.ResourceId) {
return nil
}
if ok, err := checkIfNestedResource(ctx, ref, share.Share.ResourceId, client, mgr); err == nil && ok {
if ok, err := checkIfNestedResource(ctx, ref, share.Share.ResourceId, selector, mgr); err == nil && ok {
_ = scopeExpansionCache.SetWithExpire(key, nil, scopeCacheExpiration*time.Second)
return nil
}
Expand All @@ -152,20 +161,20 @@ func resolveLightweightScope(ctx context.Context, ref *provider.Reference, scope
return errtypes.PermissionDenied("request is not for a nested resource")
}

func resolvePublicShare(ctx context.Context, ref *provider.Reference, scope *authpb.Scope, client gateway.GatewayAPIClient, mgr token.Manager) error {
func resolvePublicShare(ctx context.Context, ref *provider.Reference, scope *authpb.Scope, selector pool.Selectable[gateway.GatewayAPIClient], mgr token.Manager) error {
var share link.PublicShare
err := utils.UnmarshalJSONToProtoV1(scope.Resource.Value, &share)
if err != nil {
return err
}

if err := checkCacheForNestedResource(ctx, ref, share.ResourceId, client, mgr); err == nil {
if err := checkCacheForNestedResource(ctx, ref, share.ResourceId, selector, mgr); err == nil {
return nil
}

// Some services like wopi don't access the shared resource relative to the
// share root but instead relative to the shared resources parent.
return checkRelativeReference(ctx, ref, share.ResourceId, client)
return checkRelativeReference(ctx, ref, share.ResourceId, selector)
}

// checkRelativeReference checks if the shared resource is being accessed via a relative reference
Expand All @@ -178,7 +187,12 @@ func resolvePublicShare(ctx context.Context, ref *provider.Reference, scope *aut
// Reference{ResourceId: {StorageId: "abcd", SpaceId: "efgh"}, Path: "./New file.txt"}
// then the request is considered relative and this function would return true.
// Only references which are relative to the immediate parent of a resource are considered valid.
func checkRelativeReference(ctx context.Context, requested *provider.Reference, sharedResourceID *provider.ResourceId, client gateway.GatewayAPIClient) error {
func checkRelativeReference(ctx context.Context, requested *provider.Reference, sharedResourceID *provider.ResourceId, selector pool.Selectable[gateway.GatewayAPIClient]) error {
client, err := selector.Next()
if err != nil {
return err
}

sRes, err := client.Stat(ctx, &provider.StatRequest{Ref: &provider.Reference{ResourceId: sharedResourceID}})
if err != nil {
return err
Expand Down Expand Up @@ -209,32 +223,36 @@ func checkRelativeReference(ctx context.Context, requested *provider.Reference,
return nil
}

func resolveUserShare(ctx context.Context, ref *provider.Reference, scope *authpb.Scope, client gateway.GatewayAPIClient, mgr token.Manager) error {
func resolveUserShare(ctx context.Context, ref *provider.Reference, scope *authpb.Scope, selector pool.Selectable[gateway.GatewayAPIClient], mgr token.Manager) error {
var share collaboration.Share
err := utils.UnmarshalJSONToProtoV1(scope.Resource.Value, &share)
if err != nil {
return err
}

return checkCacheForNestedResource(ctx, ref, share.ResourceId, client, mgr)
return checkCacheForNestedResource(ctx, ref, share.ResourceId, selector, mgr)
}

func checkCacheForNestedResource(ctx context.Context, ref *provider.Reference, resource *provider.ResourceId, client gateway.GatewayAPIClient, mgr token.Manager) error {
func checkCacheForNestedResource(ctx context.Context, ref *provider.Reference, resource *provider.ResourceId, selector pool.Selectable[gateway.GatewayAPIClient], mgr token.Manager) error {
// Check if this ref is cached
key := storagespace.FormatResourceID(*resource) + scopeDelimiter + getRefKey(ref)
if _, err := scopeExpansionCache.Get(key); err == nil {
return nil
}

if ok, err := checkIfNestedResource(ctx, ref, resource, client, mgr); err == nil && ok {
if ok, err := checkIfNestedResource(ctx, ref, resource, selector, mgr); err == nil && ok {
_ = scopeExpansionCache.SetWithExpire(key, nil, scopeCacheExpiration*time.Second)
return nil
}

return errtypes.PermissionDenied("request is not for a nested resource")
}

func checkIfNestedResource(ctx context.Context, ref *provider.Reference, parent *provider.ResourceId, client gateway.GatewayAPIClient, mgr token.Manager) (bool, error) {
func checkIfNestedResource(ctx context.Context, ref *provider.Reference, parent *provider.ResourceId, selector pool.Selectable[gateway.GatewayAPIClient], mgr token.Manager) (bool, error) {
client, err := selector.Next()
if err != nil {
return false, err
}
// Since the resource ID is obtained from the scope, the current token
// has access to it.
statResponse, err := client.Stat(ctx, &provider.StatRequest{Ref: &provider.Reference{ResourceId: parent}})
Expand Down
8 changes: 6 additions & 2 deletions internal/grpc/services/appprovider/appprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,13 @@ func (s *service) registerProvider() {
pInfo.MimeTypes = mimeTypes
}

client, err := pool.GetGatewayServiceClient(s.conf.GatewaySvc)
selector, err := pool.GatewaySelector(s.conf.GatewaySvc)
if err != nil {
log.Error().Err(err).Msg("error registering app provider: could not get gateway selector")
return
}
client, err := selector.Next()
if err != nil {
log.Error().Err(err).Msg("error registering app provider: could not get gateway client")
return
}
req := &registrypb.AddAppProviderRequest{Provider: pInfo}
Expand Down
15 changes: 10 additions & 5 deletions internal/grpc/services/gateway/authprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *svc) Authenticate(ctx context.Context, req *gateway.AuthenticateRequest
c, err := s.findAuthProvider(ctx, req.Type)
if err != nil {
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error getting auth provider client"),
Status: status.NewInternal(ctx, fmt.Sprintf("error getting auth provider client %s", err.Error())),
}, nil
}

Expand Down Expand Up @@ -170,9 +170,9 @@ func (s *svc) WhoAmI(ctx context.Context, req *gateway.WhoAmIRequest) (*gateway.
}

func (s *svc) findAuthProvider(ctx context.Context, authType string) (authpb.ProviderAPIClient, error) {
c, err := pool.GetAuthRegistryServiceClient(s.c.AuthRegistryEndpoint)
c, err := s.authRegistrySelector.Next()
if err != nil {
err = errors.Wrap(err, "gateway: error getting auth registry client")
err = errors.Wrap(err, "gateway: error selecting next AuthRegistry client")
return nil, err
}

Expand All @@ -187,9 +187,14 @@ func (s *svc) findAuthProvider(ctx context.Context, authType string) (authpb.Pro

if res.Status.Code == rpc.Code_CODE_OK && res.Providers != nil && len(res.Providers) > 0 {
// TODO(labkode): check for capabilities here
c, err := pool.GetAuthProviderServiceClient(res.Providers[0].Address)
sel, err := pool.AuthProviderSelector(res.Providers[0].Address)
if err != nil {
err = errors.Wrap(err, "gateway: error getting an auth provider client")
err = errors.Wrap(err, fmt.Sprintf("gateway: error getting an '%s' auth provider selector for %s", authType, res.Providers[0].Address))
return nil, err
}
c, err := sel.Next()
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("gateway: error selecting next '%s' auth provider client for %s", authType, res.Providers[0].Address))
return nil, err
}

Expand Down
3 changes: 1 addition & 2 deletions internal/grpc/services/gateway/authregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
)

func (s *svc) ListAuthProviders(ctx context.Context, req *registry.ListAuthProvidersRequest) (*gateway.ListAuthProvidersResponse, error) {
c, err := pool.GetAuthRegistryServiceClient(s.c.AuthRegistryEndpoint)
c, err := s.authRegistrySelector.Next()
if err != nil {
return &gateway.ListAuthProvidersResponse{
Status: status.NewInternal(ctx, "gateway"),
Expand Down
Loading