Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Public share json persistence #3199

Merged
merged 11 commits into from
Sep 9, 2022
5 changes: 5 additions & 0 deletions changelog/unreleased/json-publicshare-manager.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add support for cs3 storage backends to the json publicshare manager

We enhanced the json publicshare manager to support a cs3 storage backend alongside the file and memory backends.

https://github.com/cs3org/reva/pull/3199
193 changes: 120 additions & 73 deletions pkg/publicshare/manager/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
Expand All @@ -43,72 +41,121 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/publicshare"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence/cs3"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence/file"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence/memory"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/registry"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

func init() {
registry.Register("json", New)
registry.Register("json", NewFile)
registry.Register("jsoncs3", NewCS3)
registry.Register("jsonmemory", NewMemory)
}

// New returns a new filesystem public shares manager.
func New(c map[string]interface{}) (publicshare.Manager, error) {
conf := &config{}
// NewFile returns a new filesystem public shares manager.
func NewFile(c map[string]interface{}) (publicshare.Manager, error) {
conf := &fileConfig{}
if err := mapstructure.Decode(c, conf); err != nil {
return nil, err
}

conf.init()
if conf.File == "" {
conf.File = "/var/tmp/reva/publicshares"
}

m := manager{
gatewayAddr: conf.GatewayAddr,
mutex: &sync.Mutex{},
file: conf.File,
passwordHashCost: conf.SharePasswordHashCost,
janitorRunInterval: conf.JanitorRunInterval,
enableExpiredSharesCleanup: conf.EnableExpiredSharesCleanup,
}

// attempt to create the db file
var fi os.FileInfo
var err error
if fi, err = os.Stat(m.file); os.IsNotExist(err) {
folder := filepath.Dir(m.file)
if err := os.MkdirAll(folder, 0755); err != nil {
return nil, err
}
if _, err := os.Create(m.file); err != nil {
return nil, err
}
p := file.New(conf.File)
if err := p.Init(context.Background()); err != nil {
return nil, err
}

if fi == nil || fi.Size() == 0 {
err := ioutil.WriteFile(m.file, []byte("{}"), 0644)
if err != nil {
return nil, err
}
return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}

// NewMemory returns a new in-memory public shares manager.
func NewMemory(c map[string]interface{}) (publicshare.Manager, error) {
conf := &commonConfig{}
if err := mapstructure.Decode(c, conf); err != nil {
return nil, err
}

go m.startJanitorRun()
conf.init()
p := memory.New()

if err := p.Init(context.Background()); err != nil {
return nil, err
}

return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}

// NewCS3 returns a new cs3 public shares manager.
func NewCS3(c map[string]interface{}) (publicshare.Manager, error) {
conf := &cs3Config{}
if err := mapstructure.Decode(c, conf); err != nil {
return nil, err
}

conf.init()

s, err := metadata.NewCS3Storage(conf.ProviderAddr, conf.ProviderAddr, conf.ServiceUserID, conf.ServiceUserIdp, conf.MachineAuthAPIKey)
if err != nil {
return nil, err
}
p := cs3.New(s)

if err := p.Init(context.Background()); err != nil {
return nil, err
}

return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}

// New returns a new public share manager instance
func New(gwAddr string, pwHashCost, janitorRunInterval int, enableCleanup bool, p persistence.Persistence) (publicshare.Manager, error) {
m := &manager{
gatewayAddr: gwAddr,
mutex: &sync.Mutex{},
passwordHashCost: pwHashCost,
janitorRunInterval: janitorRunInterval,
enableExpiredSharesCleanup: enableCleanup,
persistence: p,
}

return &m, nil
go m.startJanitorRun()
return m, nil
}

type config struct {
type commonConfig struct {
GatewayAddr string `mapstructure:"gateway_addr"`
File string `mapstructure:"file"`
SharePasswordHashCost int `mapstructure:"password_hash_cost"`
JanitorRunInterval int `mapstructure:"janitor_run_interval"`
EnableExpiredSharesCleanup bool `mapstructure:"enable_expired_shares_cleanup"`
}

func (c *config) init() {
if c.File == "" {
c.File = "/var/tmp/reva/publicshares"
}
type fileConfig struct {
commonConfig `mapstructure:",squash"`

File string `mapstructure:"file"`
}

type cs3Config struct {
commonConfig `mapstructure:",squash"`

ProviderAddr string `mapstructure:"provider_addr"`
ServiceUserID string `mapstructure:"service_user_id"`
ServiceUserIdp string `mapstructure:"service_user_idp"`
MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"`
}

func (c *commonConfig) init() {
if c.SharePasswordHashCost == 0 {
c.SharePasswordHashCost = 11
}
Expand All @@ -120,7 +167,7 @@ func (c *config) init() {
type manager struct {
gatewayAddr string
mutex *sync.Mutex
file string
persistence persistence.Persistence

passwordHashCost int
janitorRunInterval int
Expand Down Expand Up @@ -153,7 +200,7 @@ func (m *manager) Dump(ctx context.Context, shareChan chan<- *publicshare.WithPa
m.mutex.Lock()
defer m.mutex.Unlock()

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return err
}
Expand All @@ -170,6 +217,27 @@ func (m *manager) Dump(ctx context.Context, shareChan chan<- *publicshare.WithPa
return nil
}

// Load imports public shares and received shares from channels (e.g. during migration)
func (m *manager) Load(ctx context.Context, shareChan <-chan *publicshare.WithPassword) error {
db, err := m.persistence.Read(ctx)
if err != nil {
return err
}

for ps := range shareChan {
encShare, err := utils.MarshalProtoV1ToJSON(&ps.PublicShare)
if err != nil {
return err
}

db[ps.PublicShare.Id.GetOpaqueId()] = map[string]interface{}{
"share": string(encShare),
"password": ps.Password,
}
}
return m.persistence.Write(ctx, db)
}

// CreatePublicShare adds a new entry to manager.shares
func (m *manager) CreatePublicShare(ctx context.Context, u *user.User, rInfo *provider.ResourceInfo, g *link.Grant) (*link.PublicShare, error) {
id := &link.PublicShareId{
Expand Down Expand Up @@ -230,7 +298,7 @@ func (m *manager) CreatePublicShare(ctx context.Context, u *user.User, rInfo *pr
return nil, err
}

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand All @@ -244,7 +312,7 @@ func (m *manager) CreatePublicShare(ctx context.Context, u *user.User, rInfo *pr
return nil, errors.New("key already exists")
}

err = m.writeDb(db)
err = m.persistence.Write(ctx, db)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,7 +371,7 @@ func (m *manager) UpdatePublicShare(ctx context.Context, u *user.User, req *link
m.mutex.Lock()
defer m.mutex.Unlock()

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand All @@ -325,7 +393,7 @@ func (m *manager) UpdatePublicShare(ctx context.Context, u *user.User, req *link

db[share.Id.OpaqueId] = data

err = m.writeDb(db)
err = m.persistence.Write(ctx, db)
if err != nil {
return nil, err
}
Expand All @@ -352,7 +420,7 @@ func (m *manager) GetPublicShare(ctx context.Context, u *user.User, ref *link.Pu
m.mutex.Lock()
defer m.mutex.Unlock()

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -393,7 +461,7 @@ func (m *manager) ListPublicShares(ctx context.Context, u *user.User, filters []

log := appctx.GetLogger(ctx)

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -458,7 +526,7 @@ func (m *manager) cleanupExpiredShares() {
m.mutex.Lock()
defer m.mutex.Unlock()

db, _ := m.readDb()
db, _ := m.persistence.Read(context.Background())

for _, v := range db {
d := v.(map[string]interface{})["share"]
Expand Down Expand Up @@ -498,7 +566,7 @@ func (m *manager) revokeExpiredPublicShare(ctx context.Context, s *link.PublicSh
// RevokePublicShare undocumented.
func (m *manager) RevokePublicShare(ctx context.Context, u *user.User, ref *link.PublicShareReference) error {
m.mutex.Lock()
db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return err
}
Expand All @@ -523,11 +591,11 @@ func (m *manager) RevokePublicShare(ctx context.Context, u *user.User, ref *link

m.mutex.Lock()
defer m.mutex.Unlock()
return m.writeDb(db)
return m.persistence.Write(ctx, db)
}

func (m *manager) getByToken(ctx context.Context, token string) (*link.PublicShare, string, error) {
db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, "", err
}
Expand All @@ -552,7 +620,7 @@ func (m *manager) getByToken(ctx context.Context, token string) (*link.PublicSha

// GetPublicShareByToken gets a public share by its opaque token.
func (m *manager) GetPublicShareByToken(ctx context.Context, token string, auth *link.PublicShareAuthentication, sign bool) (*link.PublicShare, error) {
db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -596,27 +664,6 @@ func (m *manager) GetPublicShareByToken(ctx context.Context, token string, auth
return nil, errtypes.NotFound(fmt.Sprintf("share with token: `%v` not found", token))
}

func (m *manager) readDb() (map[string]interface{}, error) {
db := map[string]interface{}{}
readBytes, err := ioutil.ReadFile(m.file)
if err != nil {
return nil, err
}
if err := json.Unmarshal(readBytes, &db); err != nil {
return nil, err
}
return db, nil
}

func (m *manager) writeDb(db map[string]interface{}) error {
dbAsJSON, err := json.Marshal(db)
if err != nil {
return err
}

return ioutil.WriteFile(m.file, dbAsJSON, 0644)
}

type publicShare struct {
link.PublicShare
Password string `json:"password"`
Expand Down
Loading