Skip to content

Commit

Permalink
Add cache warmup strategy for OCS resource infos (#1664)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored May 7, 2021
1 parent dfd523d commit 71ce0a9
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 12 deletions.
8 changes: 8 additions & 0 deletions changelog/unreleased/ocs-share-cache-warmup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Enhancement: Add cache warmup strategy for OCS resource infos

Recently, a TTL cache was added to OCS to store statted resource infos. This PR
adds an interface to define warmup strategies and also adds a cbox specific
strategy which starts a goroutine to initialize the cache with all the valid
shares present in the system.

https://github.com/cs3org/reva/pull/1664
24 changes: 13 additions & 11 deletions internal/http/services/owncloud/ocs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ import (

// Config holds the config options that need to be passed down to all ocs handlers
type Config struct {
Prefix string `mapstructure:"prefix"`
Config data.ConfigData `mapstructure:"config"`
Capabilities data.CapabilitiesData `mapstructure:"capabilities"`
GatewaySvc string `mapstructure:"gatewaysvc"`
DefaultUploadProtocol string `mapstructure:"default_upload_protocol"`
UserAgentChunkingMap map[string]string `mapstructure:"user_agent_chunking_map"`
SharePrefix string `mapstructure:"share_prefix"`
HomeNamespace string `mapstructure:"home_namespace"`
AdditionalInfoAttribute string `mapstructure:"additional_info_attribute"`
ResourceInfoCacheSize int `mapstructure:"resource_info_cache_size"`
ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"`
Prefix string `mapstructure:"prefix"`
Config data.ConfigData `mapstructure:"config"`
Capabilities data.CapabilitiesData `mapstructure:"capabilities"`
GatewaySvc string `mapstructure:"gatewaysvc"`
DefaultUploadProtocol string `mapstructure:"default_upload_protocol"`
UserAgentChunkingMap map[string]string `mapstructure:"user_agent_chunking_map"`
SharePrefix string `mapstructure:"share_prefix"`
HomeNamespace string `mapstructure:"home_namespace"`
AdditionalInfoAttribute string `mapstructure:"additional_info_attribute"`
CacheWarmupDriver string `mapstructure:"cache_warmup_driver"`
CacheWarmupDrivers map[string]map[string]interface{} `mapstructure:"cache_warmup_drivers"`
ResourceInfoCacheSize int `mapstructure:"resource_info_cache_size"`
ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"`
}

// Init sets sane defaults
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/rhttp/router"
"github.com/cs3org/reva/pkg/share/cache"
"github.com/cs3org/reva/pkg/share/cache/registry"
"github.com/pkg/errors"
)

Expand All @@ -72,6 +74,13 @@ type userIdentifiers struct {
Mail string
}

func getCacheWarmupManager(c *config.Config) (cache.Warmup, error) {
if f, ok := registry.NewFuncs[c.CacheWarmupDriver]; ok {
return f(c.CacheWarmupDrivers[c.CacheWarmupDriver])
}
return nil, fmt.Errorf("driver not found: %s", c.CacheWarmupDriver)
}

// Init initializes this and any contained handlers
func (h *Handler) Init(c *config.Config) error {
h.gatewayAddr = c.GatewaySvc
Expand All @@ -87,9 +96,27 @@ func (h *Handler) Init(c *config.Config) error {

h.resourceInfoCache = gcache.New(c.ResourceInfoCacheSize).LFU().Build()

if h.resourceInfoCacheTTL > 0 {
cwm, err := getCacheWarmupManager(c)
if err == nil {
go h.startCacheWarmup(cwm)
}
}

return nil
}

func (h *Handler) startCacheWarmup(c cache.Warmup) {
infos, err := c.GetResourceInfos()
if err != nil {
return
}
for _, r := range infos {
key := wrapResourceID(r.Id)
_ = h.resourceInfoCache.SetWithExpire(key, r, time.Second*h.resourceInfoCacheTTL)
}
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log := appctx.GetLogger(r.Context())

Expand Down
28 changes: 28 additions & 0 deletions pkg/share/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package cache

import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
)

// Warmup is the interface to implement cache warmup strategies.
type Warmup interface {
GetResourceInfos() ([]*provider.ResourceInfo, error)
}
149 changes: 149 additions & 0 deletions pkg/share/cache/cbox/cbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package eos

import (
"context"
"database/sql"
"fmt"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/pkg/share/cache"
"github.com/cs3org/reva/pkg/share/cache/registry"
"github.com/cs3org/reva/pkg/storage/fs/eos"
"github.com/cs3org/reva/pkg/user"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"

// Provides mysql drivers
_ "github.com/go-sql-driver/mysql"
)

func init() {
registry.Register("cbox", New)
}

type config struct {
DbUsername string `mapstructure:"db_username"`
DbPassword string `mapstructure:"db_password"`
DbHost string `mapstructure:"db_host"`
DbPort int `mapstructure:"db_port"`
DbName string `mapstructure:"db_name"`
EOSNamespace string `mapstructure:"namespace"`
GatewaySvc string `mapstructure:"gatewaysvc"`
}

type manager struct {
conf *config
db *sql.DB
}

func parseConfig(m map[string]interface{}) (*config, error) {
c := &config{}
if err := mapstructure.Decode(m, c); err != nil {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}
return c, nil
}

// New returns a new implementation of the storage.FS interface that connects to EOS.
func New(m map[string]interface{}) (cache.Warmup, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
}
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", c.DbUsername, c.DbPassword, c.DbHost, c.DbPort, c.DbName))
if err != nil {
return nil, err
}

return &manager{
conf: c,
db: db,
}, nil
}

func (m *manager) GetResourceInfos() ([]*provider.ResourceInfo, error) {
query := "select coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source FROM oc_share WHERE (orphan = 0 or orphan IS NULL)"
rows, err := m.db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()

infos := []*provider.ResourceInfo{}
for rows.Next() {
var storageID, opaqueID string
if err := rows.Scan(&storageID, &opaqueID); err != nil {
continue
}

eosOpts := map[string]interface{}{
"namespace": m.conf.EOSNamespace,
"master_url": fmt.Sprintf("root://%s.cern.ch", storageID),
"version_invariant": true,
"gatewaysvc": m.conf.GatewaySvc,
}
eos, err := eos.New(eosOpts)
if err != nil {
return nil, err
}

ctx := user.ContextSetUser(context.Background(), &userpb.User{
Id: &userpb.UserId{
OpaqueId: "root",
},
Opaque: &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"uid": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte("0"),
},
"gid": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte("0"),
},
},
},
})

inf, err := eos.GetMD(ctx, &provider.Reference{
Spec: &provider.Reference_Id{
Id: &provider.ResourceId{
StorageId: storageID,
OpaqueId: opaqueID,
},
},
}, []string{})
if err != nil {
return nil, err
}
infos = append(infos, inf)
}

if err = rows.Err(); err != nil {
return nil, err
}

return infos, nil

}
34 changes: 34 additions & 0 deletions pkg/share/cache/registry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package registry

import "github.com/cs3org/reva/pkg/share/cache"

// NewFunc is the function that cache warmup implementations
// should register at init time.
type NewFunc func(map[string]interface{}) (cache.Warmup, error)

// NewFuncs is a map containing all the registered cache warmup implementations.
var NewFuncs = map[string]NewFunc{}

// Register registers a new cache warmup function.
// Not safe for concurrent use. Safe for use from package init.
func Register(name string, f NewFunc) {
NewFuncs[name] = f
}
2 changes: 1 addition & 1 deletion pkg/storage/fs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type NewFunc func(map[string]interface{}) (storage.FS, error)
// NewFuncs is a map containing all the registered storage backends.
var NewFuncs = map[string]NewFunc{}

// Register registers a new storage backend new function.
// Register registers a new storage backend function.
// Not safe for concurrent use. Safe for use from package init.
func Register(name string, f NewFunc) {
NewFuncs[name] = f
Expand Down

0 comments on commit 71ce0a9

Please sign in to comment.