Skip to content

Commit

Permalink
rgw/sfs: Change types.h Object ownership story
Browse files Browse the repository at this point in the history
Object was a shared_ptr hidden behind 'ObjectRef' typedef. The shared
ptr stems back from when there was a map of objects cache. Since we
are no longer sharing Objects remove the shared_ptr and make the
ownership of Object explicit. In most cases by using a unique_ptr.

This also changes the behavior of refresh_meta when there is not (yet)
a database record. We used to keep objref at nullptr. Now we
initialize it with a default object that has deleted = true and set
our state.exists to false.

Signed-off-by: Marcel Lauhoff <marcel.lauhoff@suse.com>
  • Loading branch information
Marcel Lauhoff committed Nov 15, 2023
1 parent cde6e5d commit 55e5eb7
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 157 deletions.
2 changes: 0 additions & 2 deletions src/rgw/driver/sfs/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class SFSBucket : public StoreBucket {

void write_meta(const DoutPrefixProvider* dpp);

std::unique_ptr<Object> _get_object(sfs::ObjectRef obj);

/// Verify params passed to list()
int verify_list_params(
const DoutPrefixProvider* dpp, const ListParams& params, int max
Expand Down
12 changes: 3 additions & 9 deletions src/rgw/driver/sfs/multipart.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,12 @@ SFSMultipartUploadV2::SFSMultipartUploadV2(

std::unique_ptr<rgw::sal::Object> SFSMultipartUploadV2::get_meta_obj() {
rgw_obj_key key(meta_str, string(), RGW_OBJ_NS_MULTIPART);
auto mmo =
std::make_unique<SFSMultipartMetaObject>(store, key, bucket, bucketref);

sfs::sqlite::SQLiteMultipart mpdb(store->db_conn);
auto mp = mpdb.get_multipart(upload_id);
ceph_assert(mp.has_value());
mmo->set_attrs(mp->attrs);
// TODO(jecluis): this needs to be fixed once we get rid of the objref
mmo->set_object_ref(
std::shared_ptr<sfs::Object>(sfs::Object::create_from_obj_key(key))
return std::make_unique<SFSMultipartMetaObject>(
store, key, bucket, bucketref, mp->attrs
);
return mmo;
}

int SFSMultipartUploadV2::init(
Expand Down Expand Up @@ -431,7 +425,7 @@ int SFSMultipartUploadV2::complete(
// new object, or a new version, and move the file to its location as if we
// were writing directly to it.

ObjectRef objref;
std::unique_ptr<Object> objref;
try {
objref = bucketref->create_version(target_obj->get_key());
} catch (const std::system_error& e) {
Expand Down
54 changes: 48 additions & 6 deletions src/rgw/driver/sfs/multipart.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,54 @@ class SFStore;
* bits that are relevant for the SAL layer's expected path.
*
* For reference, check 'rgw_op.cc', RGWCompleteMultipart::execute().
*
* Note that during MP uploads, before completing them, the object is
* actually a promise of a future object. To make this promise
* accessible to certain queries (e.g get_attr) initialize the
* underlying objref from MP data rather than object / versioned
* object data.
*/
struct SFSMultipartMetaObject : public rgw::sal::SFSObject {
SFSMultipartMetaObject(SFSMultipartMetaObject&) = default;
SFSMultipartMetaObject(SFSMultipartMetaObject&) = delete;
SFSMultipartMetaObject(
rgw::sal::SFStore* _st, const rgw_obj_key& _k, rgw::sal::Bucket* _b,
BucketRef _bucket
BucketRef _bucket, const rgw::sal::Attrs& attrs
)
: rgw::sal::SFSObject(_st, _k, _b, _bucket, false) {}
: rgw::sal::SFSObject(_st, _k, _b, _bucket, false) {
// Note: objref points to a object that does not actually exists.
objref.reset(sfs::Object::create_from_obj_key(_k));
objref->update_attrs(attrs);
}

struct SFSMetaObjReadOp : public ReadOp {
private:
const sfs::Object& obj;

public:
SFSMetaObjReadOp() = delete;
SFSMetaObjReadOp(const sfs::Object& _obj) : obj(_obj) {}
virtual int prepare(optional_yield, const DoutPrefixProvider*) override {
return 0;
}
virtual int
read(int64_t, int64_t, bufferlist&, optional_yield, const DoutPrefixProvider*)
override {
return -ENOTSUP;
}
virtual int iterate(
const DoutPrefixProvider*, int64_t, int64_t, RGWGetDataCB*,
optional_yield
) override {
return -ENOTSUP;
}
virtual int get_attr(
const DoutPrefixProvider*, const char* name, bufferlist& dest,
optional_yield
) override {
return obj.get_attr(name, dest);
}
const std::string get_cls_name() { return "mp_meta_obj_read"; }
};

struct SFSMetaObjDeleteOp : public DeleteOp {
SFSMetaObjDeleteOp() = default;
Expand All @@ -55,9 +95,7 @@ struct SFSMultipartMetaObject : public rgw::sal::SFSObject {
const std::string get_cls_name() { return "mp_meta_obj_delete"; }
};

virtual std::unique_ptr<Object> clone() override {
return std::unique_ptr<Object>(new SFSMultipartMetaObject{*this});
}
virtual std::unique_ptr<Object> clone() override { return nullptr; }
SFSMultipartMetaObject& operator=(const SFSMultipartMetaObject&) = delete;

virtual std::unique_ptr<DeleteOp> get_delete_op() override {
Expand All @@ -70,6 +108,10 @@ struct SFSMultipartMetaObject : public rgw::sal::SFSObject {
) override {
return 0;
}

virtual std::unique_ptr<ReadOp> get_read_op() override {
return std::make_unique<SFSMetaObjReadOp>(*objref);
}
};

class SFSMultipartPartV2 : public StoreMultipartPart {
Expand Down
56 changes: 25 additions & 31 deletions src/rgw/driver/sfs/object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,8 @@ using namespace std;

namespace rgw::sal {

SFSObject::SFSReadOp::SFSReadOp(SFSObject* _source) : source(_source) {
/*
This initialization code was originally into prepare() but that
was not sufficient to cover all cases.
There are pieces of SAL code that are calling get_*() methods
but they don't call prepare().
In those cases the SFSReadOp is not properly initialized and those
calls are going to fail.
*/
// read op needs to retrieve also the version_id from the db
source->refresh_meta(true);
objref = source->get_object_ref();
}
SFSObject::SFSReadOp::SFSReadOp(const SFSObject& _source)
: source(_source), objref(_source.get_object_ref()) {}

// Handle conditional GET params. If-Match, If-None-Match,
// If-Modified-Since, If-UnModified-Since. Return 0 if we are neutral.
Expand All @@ -52,8 +41,8 @@ int SFSObject::SFSReadOp::handle_conditionals(const DoutPrefixProvider* dpp
!params.unmod_ptr) {
return 0;
}
const std::string etag = objref->get_meta().etag;
const auto mtime = objref->get_meta().mtime;
const std::string etag = objref.get_meta().etag;
const auto mtime = objref.get_meta().mtime;
int result = 0;

if (params.if_match) {
Expand Down Expand Up @@ -114,13 +103,13 @@ int SFSObject::SFSReadOp::handle_conditionals(const DoutPrefixProvider* dpp
int SFSObject::SFSReadOp::prepare(
optional_yield /*y*/, const DoutPrefixProvider* dpp
) {
if (!objref || objref->deleted) {
if (objref.deleted) {
// at this point, we don't have an objectref because
// the object does not exist.
return -ENOENT;
}

objdata = source->store->get_data_path() / objref->get_storage_path();
objdata = source.store->get_data_path() / objref.get_storage_path();
if (!std::filesystem::exists(objdata)) {
lsfs_verb(dpp) << "object data not found at " << objdata << dendl;
return -ENOENT;
Expand All @@ -130,15 +119,15 @@ int SFSObject::SFSReadOp::prepare(
<< fmt::format(
"bucket:{} obj:{} size:{} versionid:{} "
"conditionals:(ifmatch:{} ifnomatch:{} ifmod:{} ifunmod:{})",
source->bucket->get_name(), source->get_name(),
source->get_obj_size(), source->get_instance(),
source.bucket->get_name(), source.get_name(),
source.get_obj_size(), source.get_instance(),
fmt::ptr(params.if_match), fmt::ptr(params.if_nomatch),
fmt::ptr(params.mod_ptr), fmt::ptr(params.unmod_ptr)
)
<< dendl;

if (params.lastmod) {
*params.lastmod = source->get_mtime();
*params.lastmod = source.get_mtime();
}
return handle_conditionals(dpp);
}
Expand All @@ -147,10 +136,10 @@ int SFSObject::SFSReadOp::get_attr(
const DoutPrefixProvider* /*dpp*/, const char* name, bufferlist& dest,
optional_yield /*y*/
) {
if (!objref || objref->deleted) {
if (objref.deleted) {
return -ENOENT;
}
if (!objref->get_attr(name, dest)) {
if (!objref.get_attr(name, dest)) {
return -ENODATA;
}
return 0;
Expand All @@ -163,9 +152,9 @@ int SFSObject::SFSReadOp::read(
) {
// TODO bounds check, etc.
const auto len = end + 1 - ofs;
lsfs_debug(dpp) << "bucket: " << source->bucket->get_name()
<< ", obj: " << source->get_name()
<< ", size: " << source->get_obj_size() << ", offset: " << ofs
lsfs_debug(dpp) << "bucket: " << source.bucket->get_name()
<< ", obj: " << source.get_name()
<< ", size: " << source.get_obj_size() << ", offset: " << ofs
<< ", end: " << end << ", len: " << len << dendl;

ceph_assert(std::filesystem::exists(objdata));
Expand All @@ -187,9 +176,9 @@ int SFSObject::SFSReadOp::iterate(
) {
// TODO bounds check, etc.
const auto len = end + 1 - ofs;
lsfs_debug(dpp) << "bucket: " << source->bucket->get_name()
<< ", obj: " << source->get_name()
<< ", size: " << source->get_obj_size() << ", offset: " << ofs
lsfs_debug(dpp) << "bucket: " << source.bucket->get_name()
<< ", obj: " << source.get_name()
<< ", size: " << source.get_obj_size() << ", offset: " << ofs
<< ", end: " << end << ", len: " << len << dendl;

ceph_assert(std::filesystem::exists(objdata));
Expand Down Expand Up @@ -243,7 +232,7 @@ int SFSObject::SFSDeleteOp::delete_obj(

auto version_id = source->get_instance();
std::string delete_marker_version_id;
if (source->objref) {
if (source->state.exists) {
bucketref->delete_object(
*source->objref, source->get_key(),
source->bucket->versioning_enabled(), delete_marker_version_id
Expand Down Expand Up @@ -335,7 +324,7 @@ int SFSObject::copy_object(
return -ERR_INTERNAL_ERROR;
}

const sfs::ObjectRef dstref =
const std::unique_ptr<sfs::Object> dstref =
dst_bucket_ref->create_version(dst_object->get_key());
if (!dstref) {
::close(src_fd);
Expand Down Expand Up @@ -643,7 +632,12 @@ void SFSObject::refresh_meta(bool update_version_id_from_metadata) {
try {
objref = bucketref->get(rgw_obj_key(get_name(), get_instance()));
} catch (sfs::UnknownObjectException& e) {
// object probably not created yet?
objref = std::unique_ptr<sfs::Object>(sfs::Object::create_from_obj_key(
rgw_obj_key(get_name(), get_instance())
));
objref->deleted = true;
state.exists = false;
// object probably not created yet - return a deleted placeholder
return;
}
_refresh_meta_from_object(*objref, update_version_id_from_metadata);
Expand Down
28 changes: 15 additions & 13 deletions src/rgw/driver/sfs/object.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,35 @@ class SFStore;

class SFSObject : public StoreObject {
private:
SFStore* store;
RGWAccessControlPolicy acls;
sfs::BucketRef bucketref;
sfs::ObjectRef objref;

protected:
SFSObject(SFSObject&) = default;
SFStore* store;
sfs::BucketRef bucketref;
std::unique_ptr<sfs::Object> objref;

SFSObject(SFSObject&) = delete;

void _refresh_meta_from_object(
const sfs::Object& obj_to_refresh,
bool update_version_id_from_metadata = false
);

const sfs::Object& get_object_ref() const { return *objref; }

public:
/**
* reads an object's contents.
*/
struct SFSReadOp : public ReadOp {
private:
SFSObject* source;
sfs::ObjectRef objref;
const SFSObject& source;
const sfs::Object& objref;
std::filesystem::path objdata;
int handle_conditionals(const DoutPrefixProvider* dpp) const;

public:
SFSReadOp(SFSObject* _source);
SFSReadOp(const SFSObject& _source);

virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp)
override;
Expand Down Expand Up @@ -109,7 +112,9 @@ class SFSObject : public StoreObject {
}

virtual std::unique_ptr<Object> clone() override {
return std::unique_ptr<Object>(new SFSObject{*this});
return std::unique_ptr<Object>(
new SFSObject(store, get_key(), get_bucket(), bucketref, true)
);
}

virtual int delete_object(
Expand Down Expand Up @@ -184,7 +189,8 @@ class SFSObject : public StoreObject {
* Obtain a Read Operation.
*/
virtual std::unique_ptr<ReadOp> get_read_op() override {
return std::make_unique<SFSObject::SFSReadOp>(this);
this->refresh_meta(true);
return std::make_unique<SFSObject::SFSReadOp>(*this);
}
/**
* Obtain a Delete Operation.
Expand Down Expand Up @@ -229,10 +235,6 @@ class SFSObject : public StoreObject {

bool get_attr(const std::string& name, bufferlist& dest);

sfs::ObjectRef get_object_ref() { return objref; }

void set_object_ref(sfs::ObjectRef objref) { this->objref = objref; }

// Refresh metadata from db.
// Also retrieves version_id when specified.
// There are situations (like delete operations) in which we don't want to
Expand Down
15 changes: 10 additions & 5 deletions src/rgw/driver/sfs/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,14 @@ void Object::delete_object_data(SFStore* store) const {
std::filesystem::remove(folder_path, delete_folder_error);
}

ObjectRef Bucket::create_version(const rgw_obj_key& key) const {
std::unique_ptr<Object> Bucket::create_version(const rgw_obj_key& key) const {
// even if a specific version was not asked we generate one
// non-versioned bucket objects will also have a version_id
auto version_id = key.instance;
if (version_id.empty()) {
version_id = generate_new_version_id(store->ceph_context());
}
ObjectRef result;
std::unique_ptr<Object> result;
sqlite::SQLiteVersionedObjects objs_versions(store->db_conn);
// create objects in a transaction.
// That way threads trying to create the same object in parallel will be
Expand All @@ -263,7 +263,7 @@ ObjectRef Bucket::create_version(const rgw_obj_key& key) const {
return result;
}

ObjectRef Bucket::get(const rgw_obj_key& key) const {
std::unique_ptr<Object> Bucket::get(const rgw_obj_key& key) const {
auto maybe_result = Object::try_fetch_from_database(
store, key.name, info.bucket.bucket_id, key.instance,
get_info().versioning_enabled()
Expand All @@ -273,7 +273,7 @@ ObjectRef Bucket::get(const rgw_obj_key& key) const {
throw UnknownObjectException();
}

return std::shared_ptr<Object>(maybe_result);
return std::unique_ptr<Object>(maybe_result);
}

bool Bucket::delete_object(
Expand Down Expand Up @@ -310,7 +310,7 @@ bool Bucket::delete_object(
std::string Bucket::create_non_existing_object_delete_marker(
const rgw_obj_key& key
) const {
auto obj = std::shared_ptr<Object>(
auto obj = std::unique_ptr<Object>(
Object::create_commit_delete_marker(key, store, info.bucket.bucket_id)
);
// create the delete marker
Expand Down Expand Up @@ -350,6 +350,11 @@ bool Bucket::_delete_object_non_versioned(
) const {
auto version_to_delete =
db_versioned_objs.get_last_versioned_object(obj.path.get_uuid());
if (!version_to_delete.has_value()) {
// if there is no latest version to delete, assume that we are
// already deleted and do nothing
return true;
}
return _delete_object_version(db_versioned_objs, *version_to_delete);
}

Expand Down
Loading

0 comments on commit 55e5eb7

Please sign in to comment.