Skip to content

Commit

Permalink
Improve online delete backend locking
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Apr 17, 2020
1 parent 020b285 commit fc00054
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 87 deletions.
23 changes: 8 additions & 15 deletions src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,23 +446,16 @@ SHAMapStoreImp::run()
;
}

std::string nextArchiveDir =
dbRotating_->getWritableBackend()->getName();
lastRotated_ = validatedSeq;
std::shared_ptr<NodeStore::Backend> oldBackend;
{
std::lock_guard lock (dbRotating_->peekMutex());

state_db_.setState (SavedState {newBackend->getName(),
nextArchiveDir, lastRotated_});
clearCaches (validatedSeq);
oldBackend = dbRotating_->rotateBackends(
std::move(newBackend),
lock);
}
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
state_db_.setState(SavedState {
newBackend->getName(),
dbRotating_->getName(),
lastRotated_});

oldBackend->setDeletePath();
clearCaches(validatedSeq);
dbRotating_->rotateBackends(std::move(newBackend));

JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
}
}
}
Expand Down
12 changes: 2 additions & 10 deletions src/ripple/nodestore/DatabaseRotating.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,9 @@ class DatabaseRotating : public Database
TaggedCache<uint256, NodeObject> const&
getPositiveCache() = 0;

virtual std::mutex& peekMutex() const = 0;

virtual
std::shared_ptr<Backend> const&
getWritableBackend() const = 0;

virtual
std::shared_ptr<Backend>
rotateBackends(
std::shared_ptr<Backend> newBackend,
std::lock_guard<std::mutex> const&) = 0;
void
rotateBackends(std::shared_ptr<Backend> newBackend) = 0;
};

}
Expand Down
98 changes: 87 additions & 11 deletions src/ripple/nodestore/impl/DatabaseRotatingImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,54 @@ DatabaseRotatingImp::DatabaseRotatingImp(
setParent(parent);
}

std::shared_ptr<Backend>
DatabaseRotatingImp::rotateBackends(
std::shared_ptr<Backend> newBackend,
std::lock_guard<std::mutex> const&)
void
DatabaseRotatingImp::rotateBackends(std::shared_ptr<Backend> newBackend)
{
auto oldBackend {std::move(archiveBackend_)};
std::lock_guard lock(mutex_);
archiveBackend_->setDeletePath();
archiveBackend_ = std::move(writableBackend_);
writableBackend_ = std::move(newBackend);
return oldBackend;
}

std::string
DatabaseRotatingImp::getName() const
{
std::lock_guard lock(mutex_);
return writableBackend_->getName();
}

std::int32_t
DatabaseRotatingImp::getWriteLoad() const
{
std::lock_guard lock(mutex_);
return writableBackend_->getWriteLoad();
}

void
DatabaseRotatingImp::import(Database& source)
{
auto const backend = [&] {
std::lock_guard lock(mutex_);
return writableBackend_;
}();

importInternal(*backend, source);
}

bool
DatabaseRotatingImp::storeLedger(std::shared_ptr<Ledger const> const& srcLedger)
{
auto const backend = [&] {
std::lock_guard lock(mutex_);
return writableBackend_;
}();

return Database::storeLedger(
*srcLedger,
backend,
pCache_,
nCache_,
nullptr);
}

void
Expand All @@ -65,7 +104,13 @@ DatabaseRotatingImp::store(NodeObjectType type, Blob&& data,
{
auto nObj = NodeObject::createObject(type, std::move(data), hash);
pCache_->canonicalize_replace_cache(hash, nObj);
getWritableBackend()->store(nObj);

auto const backend = [&] {
std::lock_guard lock(mutex_);
return writableBackend_;
}();
backend->store(nObj);

nCache_->erase(hash);
storeStats(nObj->getData().size());
}
Expand All @@ -78,6 +123,7 @@ DatabaseRotatingImp::asyncFetch(uint256 const& hash,
object = pCache_->fetch(hash);
if (object || nCache_->touch_if_exists(hash))
return true;

// Otherwise post a read
Database::asyncFetch(hash, seq, pCache_, nCache_);
return false;
Expand All @@ -102,19 +148,49 @@ DatabaseRotatingImp::sweep()
std::shared_ptr<NodeObject>
DatabaseRotatingImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
{
Backends b = getBackends();
auto nObj = fetchInternal(hash, b.writableBackend);
auto backends = [&] {
std::lock_guard lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();

// Try to fetch from the writable backend
auto nObj = fetchInternal(hash, backends.first);
if (! nObj)
{
nObj = fetchInternal(hash, b.archiveBackend);
// Otherwise try to fetch from the archive backend
nObj = fetchInternal(hash, backends.second);
if (nObj)
{
getWritableBackend()->store(nObj);
{
// Refresh the writable backend pointer
std::lock_guard lock(mutex_);
backends.first = writableBackend_;
}

// Update writable backend with data from the archive backend
backends.first->store(nObj);
nCache_->erase(hash);
}
}
return nObj;
}

void
DatabaseRotatingImp::for_each(
std::function <void(std::shared_ptr<NodeObject>)> f)
{
auto const backends = [&] {
std::lock_guard lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();

// Iterate the writable backend
backends.first->for_each(f);

// Iterate the archive backend
backends.second->for_each(f);
}


} // NodeStore
} // ripple
62 changes: 11 additions & 51 deletions src/ripple/nodestore/impl/DatabaseRotatingImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,17 @@ class DatabaseRotatingImp : public DatabaseRotating
stopThreads();
}

std::shared_ptr<Backend> const&
getWritableBackend() const override
{
std::lock_guard lock (rotateMutex_);
return writableBackend_;
}

std::shared_ptr<Backend>
rotateBackends(
std::shared_ptr<Backend> newBackend,
std::lock_guard<std::mutex> const&) override;

std::mutex& peekMutex() const override
{
return rotateMutex_;
}
void
rotateBackends(std::shared_ptr<Backend> newBackend) override;

std::string getName() const override
{
return getWritableBackend()->getName();
}
std::string
getName() const override;

std::int32_t getWriteLoad() const override
{
return getWritableBackend()->getWriteLoad();
}
std::int32_t
getWriteLoad() const override;

void import (Database& source) override
{
importInternal (*getWritableBackend(), source);
}
void
import(Database& source) override;

void store(NodeObjectType type, Blob&& data,
uint256 const& hash, std::uint32_t seq) override;
Expand All @@ -94,11 +74,7 @@ class DatabaseRotatingImp : public DatabaseRotating
std::shared_ptr<NodeObject>& object) override;

bool
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override
{
return Database::storeLedger(
*srcLedger, getWritableBackend(), pCache_, nCache_, nullptr);
}
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override;

int
getDesiredAsyncReadCount(std::uint32_t seq) override
Expand Down Expand Up @@ -130,29 +106,13 @@ class DatabaseRotatingImp : public DatabaseRotating

std::shared_ptr<Backend> writableBackend_;
std::shared_ptr<Backend> archiveBackend_;
mutable std::mutex rotateMutex_;

struct Backends {
std::shared_ptr<Backend> const& writableBackend;
std::shared_ptr<Backend> const& archiveBackend;
};

Backends getBackends() const
{
std::lock_guard lock (rotateMutex_);
return Backends {writableBackend_, archiveBackend_};
}
mutable std::mutex mutex_;

std::shared_ptr<NodeObject> fetchFrom(
uint256 const& hash, std::uint32_t seq) override;

void
for_each(std::function <void(std::shared_ptr<NodeObject>)> f) override
{
Backends b = getBackends();
b.archiveBackend->for_each(f);
b.writableBackend->for_each(f);
}
for_each(std::function <void(std::shared_ptr<NodeObject>)> f) override;
};

}
Expand Down

0 comments on commit fc00054

Please sign in to comment.