Skip to content
This repository has been archived by the owner on Jan 3, 2024. It is now read-only.

Commit

Permalink
rgw/sfs: Remove Bucket objects, obj_map_lock
Browse files Browse the repository at this point in the history
Remove objects cache. Replace with direct Object creation from our
SQLite database. This changes semantics of Objects retrieved from
Bucket::get_or_create. Previously they pointed to the _same_
in-memory, database object representation. Now, subsequent calls
return different Objects created fresh from the database each time.

Signed-off-by: Marcel Lauhoff <marcel.lauhoff@suse.com>
  • Loading branch information
Marcel Lauhoff committed Mar 1, 2023
1 parent f2a442f commit 143d1fc
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 75 deletions.
175 changes: 112 additions & 63 deletions src/rgw/driver/sfs/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "rgw/rgw_sal_sfs.h"
#include "rgw/driver/sfs/types.h"
#include "rgw/driver/sfs/object_state.h"
#include "sqlite_objects.h"

namespace rgw::sal::sfs {

Expand Down Expand Up @@ -72,14 +73,20 @@ Object* Object::create_commit_delete_marker(const rgw_obj_key& key,
}

Object* Object::create_commit_new_object(const rgw_obj_key& key, SFStore* store,
const std::string& bucket_id) {
const std::string& bucket_id,
const std::string* version_id) {
Object* result = new Object(key);

if (version_id != nullptr) {
result->instance = *version_id;
}

sqlite::DBOPObjectInfo oinfo;
oinfo.uuid = result->path.get_uuid();
oinfo.bucket_id = bucket_id;
oinfo.name = result->name;


// TODO(irq0) make object and version insert a transaction
sqlite::SQLiteObjects dbobjs(store->db_conn);
dbobjs.store_object(oinfo);
Expand All @@ -93,6 +100,63 @@ Object* Object::create_commit_new_object(const rgw_obj_key& key, SFStore* store,
return result;
}

Object* Object::try_create_with_last_version_from_database_fetch(
SFStore* store, const std::string& name, const std::string& bucket_id) {
sqlite::SQLiteObjects objs(store->db_conn);
auto obj = objs.get_object(bucket_id, name);
if (!obj) {
return nullptr;
}

sqlite::SQLiteVersionedObjects objs_versions(store->db_conn);
auto last_version = objs_versions.get_last_versioned_object(obj->uuid);
if (!last_version.has_value()) {
return nullptr;
}

Object* result = new Object(name, obj->uuid);
result->deleted = (last_version->object_state == ObjectState::DELETED);
result->version_id = last_version->id;
result->meta = {.size = obj->size,
.etag = obj->etag,
.mtime = obj->mtime,
.set_mtime = obj->set_mtime,
.delete_at = obj->delete_at};
result->attrs = last_version->attrs;
result->instance = last_version->version_id;

return result;
}

Object* Object::try_create_from_database_fetch(SFStore* store,
const std::string& name,
const std::string& bucket_id,
const std::string& version_id) {
sqlite::SQLiteObjects objs(store->db_conn);
auto obj = objs.get_object(bucket_id, name);
if (!obj) {
return nullptr;
}

sqlite::SQLiteVersionedObjects objs_versions(store->db_conn);
auto version = objs_versions.get_versioned_object(version_id);
if (!version.has_value()) {
return nullptr;
}

Object* result = new Object(name, obj->uuid);
result->deleted = (version->object_state == ObjectState::DELETED);
result->version_id = version->id;
result->meta = {.size = obj->size,
.etag = obj->etag,
.mtime = obj->mtime,
.set_mtime = obj->set_mtime,
.delete_at = obj->delete_at};
result->attrs = version->attrs;
result->instance = version->version_id;
return result;
}

std::filesystem::path Object::get_storage_path() const {
return path.to_path() / std::to_string(version_id);
}
Expand Down Expand Up @@ -134,13 +198,14 @@ void Object::update_attrs(const Attrs& update) {
attrs = update;
}

void Object::add_new_version(SFStore *store) {
void Object::update_commit_new_version(SFStore *store, const std::string& new_version) {
sqlite::DBOPVersionedObjectInfo version_info;
version_info.object_id = path.get_uuid();
version_info.object_state = ObjectState::OPEN;
version_info.version_id = instance;
version_info.version_id = new_version;
sqlite::SQLiteVersionedObjects db_versioned_objs(store->db_conn);
version_id = db_versioned_objs.insert_versioned_object(version_info);
instance = new_version;
}

void Object::metadata_change_version_state(SFStore *store, ObjectState state) {
Expand Down Expand Up @@ -257,52 +322,75 @@ void MultipartUpload::abort(const DoutPrefixProvider *dpp) {
objref.reset();
}

bool Bucket::want_new_version(const rgw_obj_key& key, ObjectRef obj) {
return !(key.instance.empty() || key.instance == obj->instance);
bool Bucket::want_specific_version(const rgw_obj_key& key) {
return !key.instance.empty();
}

ObjectRef Bucket::get_or_create(const rgw_obj_key& key) {
std::lock_guard l(obj_map_lock);

ObjectRef result;
try {
result = get_unmutexed(key.name);
if (want_new_version(key, result)) {
result->add_new_version(store);

auto maybe_result = Object::try_create_with_last_version_from_database_fetch(
store, key.name, info.bucket.bucket_id);

if (maybe_result == nullptr) { // new object
result.reset(Object::create_commit_new_object(
key, store, info.bucket.bucket_id, &key.instance));
return result;
}

// an object exists with at least 1 version
if (want_specific_version(key) && maybe_result->instance == key.instance) {
// requested version happens to be the last version
result.reset(maybe_result);
} else if (want_specific_version(key) &&
maybe_result->instance != key.instance) {
// requested version is not last

auto specific_version_object = Object::try_create_from_database_fetch(
store, key.name, info.bucket.bucket_id, key.instance);

if (specific_version_object == nullptr) {
// requested version does not exist -> create it from last
// version object
result.reset(maybe_result);
result->update_commit_new_version(store, key.instance);
} else {
// requested version does exist -> return it
result.reset(specific_version_object);
}
} catch (const UnknownObjectException& _) {
result = std::shared_ptr<Object>(
Object::create_commit_new_object(key, store, info.bucket.bucket_id));
objects[key.name] = result;
} else {
// no specific version requested - return last
result.reset(maybe_result);
}

ceph_assert(result);
return result;
}

ObjectRef Bucket::get_unmutexed(const std::string& name) {
auto it = objects.find(name);
if (it == objects.end()) {
auto maybe_result = Object::try_create_with_last_version_from_database_fetch(
store, name, info.bucket.bucket_id);

if (maybe_result == nullptr) {
throw UnknownObjectException();
}
return it->second;
return std::shared_ptr<Object>(std::move(maybe_result));
}

ObjectRef Bucket::get(const std::string& name) {
std::lock_guard l(obj_map_lock);
return get_unmutexed(name);
}

std::vector<ObjectRef> Bucket::get_all() {
std::vector<ObjectRef> result;
std::lock_guard l(obj_map_lock);
for (const auto& [name, objref] : objects) {
result.push_back(objref);
sqlite::SQLiteObjects dbobjs(store->db_conn);
for (const auto& db_obj : dbobjs.get_objects(info.bucket.bucket_id)) {
result.push_back(get(db_obj.name));
}
return result;
}

void Bucket::delete_object(ObjectRef objref, const rgw_obj_key & key) {
std::lock_guard l(obj_map_lock);

sqlite::SQLiteVersionedObjects db_versioned_objs(store->db_conn);
// get the last available version to make a copy changing the object state to DELETED
auto last_version = db_versioned_objs.get_last_versioned_object(objref->path.get_uuid());
Expand Down Expand Up @@ -331,11 +419,8 @@ void Bucket::delete_object(ObjectRef objref, const rgw_obj_key & key) {

std::string Bucket::create_non_existing_object_delete_marker(
const rgw_obj_key & key) {
std::lock_guard l(obj_map_lock);

auto obj = std::shared_ptr<Object>(
Object::create_commit_delete_marker(key, store, info.bucket.bucket_id));
objects[key.name] = obj;
// create the delete marker
// generate a new version id
#define OBJ_INSTANCE_LEN 32
Expand Down Expand Up @@ -369,7 +454,6 @@ void Bucket::_undelete_object(ObjectRef objref, const rgw_obj_key & key,
objref->deleted = false;
} else {
// all versions were removed for this object
objects.erase(key.name);
}
}
} else {
Expand All @@ -382,39 +466,4 @@ void Bucket::_undelete_object(ObjectRef objref, const rgw_obj_key & key,
}
}

std::optional<ObjectRef> Bucket::get_from_db(const std::string& name) {
sqlite::SQLiteObjects objs(store->db_conn);
auto obj = objs.get_object(info.bucket.bucket_id, name);
if (!obj) {
return nullptr;
}
sqlite::SQLiteVersionedObjects objs_versions(store->db_conn);
auto last_version = objs_versions.get_last_versioned_object(obj->uuid);
if (last_version.has_value()) {
auto new_obj = Object::create_for_query(
obj->name, obj->uuid,
(last_version->object_state == ObjectState::DELETED), last_version->id);
new_obj->update_meta({.size = obj->size,
.etag = obj->etag,
.mtime = obj->mtime,
.set_mtime = obj->set_mtime,
.delete_at = obj->delete_at});
new_obj->update_attrs(last_version->attrs);
new_obj->instance = last_version->version_id;
return std::shared_ptr<Object>(std::move(new_obj));
}
return nullptr;
}

void Bucket::_refresh_objects() {
sqlite::SQLiteObjects objs(store->db_conn);
auto existing = objs.get_objects(info.bucket.bucket_id);
for (const auto& obj : existing) {
auto maybeRef = get_from_db(obj.name);
if (maybeRef) {
objects[obj.name] = *maybeRef;
}
}
}

} // ns rgw::sal::sfs
24 changes: 12 additions & 12 deletions src/rgw/driver/sfs/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,15 @@ class Object {
static Object* create_commit_delete_marker(const rgw_obj_key& key,
SFStore* store,
const std::string& bucket_id);
static Object* create_commit_new_object(const rgw_obj_key& key,
SFStore* store,
const std::string& bucket_id);
static Object* create_commit_new_object(
const rgw_obj_key& key, SFStore* store, const std::string& bucket_id,
const std::string* version_id);

static Object* try_create_with_last_version_from_database_fetch(
SFStore* store, const std::string& name, const std::string& bucket_id);
static Object* try_create_from_database_fetch(
SFStore* store, const std::string& name, const std::string& bucket_id,
const std::string& version_id);

const Meta get_meta() const;
const Meta get_default_meta() const;
Expand All @@ -97,8 +103,8 @@ class Object {

std::filesystem::path get_storage_path() const;

// Add new object version. Sets version_id to the one created
void add_new_version(SFStore* store);
// Update version and commit to database
void update_commit_new_version(SFStore* store, const std::string& version_id);

// Change obj version state.
// Use this for example to update objs to in flight states like
Expand Down Expand Up @@ -314,8 +320,6 @@ class Bucket {
RGWBucketInfo info;
rgw::sal::Attrs attrs;
bool deleted{false};
std::map<std::string, ObjectRef> objects;
ceph::mutex obj_map_lock = ceph::make_mutex("obj_map_lock");

public:
ceph::mutex multipart_map_lock = ceph::make_mutex("multipart_map_lock");
Expand All @@ -324,8 +328,6 @@ class Bucket {
Bucket(const Bucket&) = delete;

private:
std::optional<ObjectRef> get_from_db(const std::string &name);
void _refresh_objects();
void _undelete_object(ObjectRef objref, const rgw_obj_key & key,
sqlite::SQLiteVersionedObjects & sqlite_versioned_objects,
sqlite::DBOPVersionedObjectInfo & last_version);
Expand All @@ -341,7 +343,6 @@ class Bucket {
owner(_owner),
info(_bucket_info),
attrs(_attrs) {
_refresh_objects();
}

const RGWBucketInfo& get_info() const{
Expand Down Expand Up @@ -397,7 +398,7 @@ class Bucket {
}

private:
bool want_new_version(const rgw_obj_key &key, ObjectRef obj);
bool want_specific_version(const rgw_obj_key &key);

public:
// Return object ref for key
Expand Down Expand Up @@ -451,7 +452,6 @@ class Bucket {
}

void finish_multipart(const std::string &upload_id, ObjectRef objref) {
std::lock_guard l1(obj_map_lock);
std::lock_guard l2(multipart_map_lock);

auto it = multiparts.find(upload_id);
Expand Down

0 comments on commit 143d1fc

Please sign in to comment.