Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decomposedfs: write metadata once #3816

Merged
merged 3 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/write-metadata-once.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Write Metadata once

decomposedfs now aggregates metadata when creating directories and spaces into a single write.

https://github.com/cs3org/reva/pull/3816
11 changes: 0 additions & 11 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
Expand Down Expand Up @@ -589,16 +588,6 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference)
return
}

if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting {
// mark the home node as the end of propagation
if err = n.SetXattrString(prefixes.PropagationAttr, "1"); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate")

// FIXME: This does not return an error at all, but results in a severe situation that the
// part tree is not marked for propagation
return
}
}
return
}

Expand Down
54 changes: 15 additions & 39 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,48 +164,22 @@ func (n *Node) SetType(t provider.ResourceType) {
n.nodeType = &t
}

// ChangeOwner sets the owner of n to newOwner
func (n *Node) ChangeOwner(new *userpb.UserId) (err error) {
n.SpaceRoot.owner = new

attribs := Attributes{}
attribs.SetString(prefixes.OwnerIDAttr, new.OpaqueId)
attribs.SetString(prefixes.OwnerIDPAttr, new.Idp)
attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(new.Type))

if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil {
return err
}

return
}

// WriteAllNodeMetadata writes the Node metadata to disk
func (n *Node) WriteAllNodeMetadata(ctx context.Context) (err error) {
// NodeMetadata writes the Node metadata to disk and allows passing additional attributes
func (n *Node) NodeMetadata() Attributes {
attribs := Attributes{}
attribs.SetInt64(prefixes.TypeAttr, int64(n.Type()))
attribs.SetString(prefixes.ParentidAttr, n.ParentID)
attribs.SetString(prefixes.NameAttr, n.Name)
attribs.SetString(prefixes.BlobIDAttr, n.BlobID)
attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize)

return n.SetXattrs(attribs, true)
if n.Type() == provider.ResourceType_RESOURCE_TYPE_FILE {
attribs.SetString(prefixes.BlobIDAttr, n.BlobID)
attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize)
}
return attribs
}

// WriteOwner writes the space owner
func (n *Node) WriteOwner(owner *userpb.UserId) error {
// SetOwner sets the space owner on the node
func (n *Node) SetOwner(owner *userpb.UserId) {
n.SpaceRoot.owner = owner

attribs := Attributes{}
attribs.SetString(prefixes.OwnerIDAttr, owner.OpaqueId)
attribs.SetString(prefixes.OwnerIDPAttr, owner.Idp)
attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(owner.Type))

if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil {
return err
}
n.SpaceRoot.owner = owner
return nil
}

// SpaceOwnerOrManager returns the space owner of the space. If no owner is set
Expand Down Expand Up @@ -354,11 +328,13 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis

if revisionSuffix == "" {
n.BlobID = attrs.String(prefixes.BlobIDAttr)
blobSize, err := attrs.Int64(prefixes.BlobsizeAttr)
if err != nil {
return nil, err
if n.BlobID != "" {
blobSize, err := attrs.Int64(prefixes.BlobsizeAttr)
if err != nil {
return nil, err
}
n.Blobsize = blobSize
}
n.Blobsize = blobSize
} else {
n.BlobID, err = lu.ReadBlobIDAttr(nodePath + revisionSuffix)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var _ = Describe("Node", func() {
n.BlobID = "TestBlobID"
n.Blobsize = blobsize

err = n.WriteAllNodeMetadata(env.Ctx)
err = n.SetXattrs(n.NodeMetadata(), true)
Expect(err).ToNot(HaveOccurred())
n2, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())
Expand Down
33 changes: 15 additions & 18 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,18 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
return nil, errors.Wrap(err, "Decomposedfs: error creating node")
}

if err := root.WriteAllNodeMetadata(ctx); err != nil {
return nil, err
}
var owner *userv1beta1.UserId
if req.GetOwner() != nil && req.GetOwner().GetId() != nil {
owner = req.GetOwner().GetId()
root.SetOwner(req.GetOwner().GetId())
} else {
owner = &userv1beta1.UserId{OpaqueId: spaceID, Type: userv1beta1.UserType_USER_TYPE_SPACE_OWNER}
}
if err := root.WriteOwner(owner); err != nil {
return nil, err
}

err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID)
if err != nil {
return nil, err
root.SetOwner(&userv1beta1.UserId{OpaqueId: spaceID, Type: userv1beta1.UserType_USER_TYPE_SPACE_OWNER})
}

metadata := make(node.Attributes, 6)
metadata := node.Attributes{}
metadata.SetString(prefixes.OwnerIDAttr, root.Owner().GetOpaqueId())
metadata.SetString(prefixes.OwnerIDPAttr, root.Owner().GetIdp())
metadata.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(root.Owner().GetType()))

// always enable propagation on the storage space root
// mark the space root node as the end of propagation
// always mark the space root node as the end of propagation
metadata.SetString(prefixes.PropagationAttr, "1")
metadata.SetString(prefixes.NameAttr, req.Name)
metadata.SetString(prefixes.SpaceNameAttr, req.Name)
Expand Down Expand Up @@ -151,14 +141,20 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
metadata.SetString(prefixes.SpaceAliasAttr, alias)
}

// Write node
if err := root.SetXattrs(metadata, true); err != nil {
return nil, err
}

// Write index
err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID)
if err != nil {
return nil, err
}

ctx = context.WithValue(ctx, utils.SpaceGrant, struct{ SpaceType string }{SpaceType: req.Type})

if req.Type != _spaceTypePersonal {
u := ctxpkg.ContextMustGetUser(ctx)
if err := fs.AddGrant(ctx, &provider.Reference{
ResourceId: &provider.ResourceId{
SpaceId: spaceID,
Expand Down Expand Up @@ -639,6 +635,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
}

// FIXME remove space blobs
// FIXME invalidate cache

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/testhelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (t *TestEnv) CreateTestFile(name, blobID, parentID, spaceID string, blobSiz
if err != nil {
return nil, err
}
err = n.WriteAllNodeMetadata(context.Background())
err = n.SetXattrs(n.NodeMetadata(), true)
if err != nil {
return nil, err
}
Expand Down
43 changes: 23 additions & 20 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,15 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool)
return errors.Wrap(err, "Decomposedfs: error creating node")
}

err = n.WriteAllNodeMetadata(ctx)
attributes := n.NodeMetadata()
if markprocessing {
attributes[prefixes.StatusPrefix] = []byte(node.ProcessingStatus)
}
err = n.SetXattrs(attributes, true)
if err != nil {
return err
}

if markprocessing {
_ = n.SetXattr(prefixes.StatusPrefix, []byte(node.ProcessingStatus))
}

// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentPath(), n.Name)
var link string
Expand Down Expand Up @@ -196,10 +196,6 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) {
return
}

if err := n.SetTreeSize(0); err != nil {
return err
}

// make child appear in listings
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
err = os.Symlink(relativeNodePath, filepath.Join(n.ParentPath(), n.Name))
Expand Down Expand Up @@ -907,7 +903,12 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
return errors.Wrap(err, "Decomposedfs: error creating node")
}

return n.WriteAllNodeMetadata(ctx)
attributes := n.NodeMetadata()
attributes[prefixes.TreesizeAttr] = []byte("0") // initialize as empty, TODO why bother? if it is not set we could treat it as 0?
if t.options.TreeTimeAccounting || t.options.TreeSizeAccounting {
attributes[prefixes.PropagationAttr] = []byte("1") // mark the node for propagation
}
return n.SetXattrs(attributes, true)
}

var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)
Expand Down Expand Up @@ -938,19 +939,21 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) (
if err != nil {
return
}
recycleNode.SetType(t.lookup.TypeFromPath(recycleNode.InternalPath()))
recycleNode.SetType(t.lookup.TypeFromPath(deletedNodePath))

var attrBytes []byte
// lookup blobID in extended attributes
if attrBytes, err = backend.Get(deletedNodePath, prefixes.BlobIDAttr); err == nil {
recycleNode.BlobID = string(attrBytes)
} else {
return
}
if recycleNode.Type() == provider.ResourceType_RESOURCE_TYPE_FILE {
// lookup blobID in extended attributes
if attrBytes, err = backend.Get(deletedNodePath, prefixes.BlobIDAttr); err == nil {
recycleNode.BlobID = string(attrBytes)
} else {
return
}

// lookup blobSize in extended attributes
if recycleNode.Blobsize, err = backend.GetInt64(deletedNodePath, prefixes.BlobsizeAttr); err != nil {
return
// lookup blobSize in extended attributes
if recycleNode.Blobsize, err = backend.GetInt64(deletedNodePath, prefixes.BlobsizeAttr); err != nil {
return
}
}

// lookup parent id in extended attributes
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/utils/decomposedfs/tree/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ var _ = Describe("Tree", func() {
JustBeforeEach(func() {
trashPath = path.Join(env.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2))
Expect(t.Delete(env.Ctx, n)).To(Succeed())

env.Blobstore.On("Delete", mock.Anything).Return(nil)
})

Describe("PurgeRecycleItemFunc", func() {
Expand All @@ -279,10 +281,6 @@ var _ = Describe("Tree", func() {
_, err := os.Stat(trashPath)
Expect(err).To(HaveOccurred())
})

It("does not try to delete a blob from the blobstore", func() {
env.Blobstore.AssertNotCalled(GinkgoT(), "Delete", mock.AnythingOfType("*node.Node"))
})
Comment on lines -282 to -285
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm ... were we testing a bug? Or was there a reason for asserting NO delete?

})
})
})
Expand Down