Skip to content

Commit

Permalink
Revert D47372992: Multisect successfully blamed "D47372992: [cachelib…
Browse files Browse the repository at this point in the history
…] Make periodic worker start and stop util functions" for test or build failures

Summary:
This diff is reverting D47372992
D47372992: [cachelib] Make periodic worker start and stop util functions by jiayuebao has been identified to be causing the following test or build failures:

Tests affected:
- [scm/grepo/integration_tests:commit-journaling - test-commit-journaling.t (run-tests)](https://www.internalfb.com/intern/test/844424993772291/)

Here's the Multisect link:
https://www.internalfb.com/multisect/2429888
Here are the tasks that are relevant to this breakage:

We're generating a revert to back out the changes in this diff, please note the backout may land if someone accepts it.

If you believe this diff has been generated in error you may Commandeer and Abandon it.

Reviewed By: crassirostris

Differential Revision: D47462316

fbshipit-source-id: 7a296e78e73c4d2abfd7e8cd738d932b98510d4a
  • Loading branch information
generatedunixname89002005232357 authored and facebook-github-bot committed Jul 14, 2023
1 parent b3d469a commit b74dec6
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 81 deletions.
40 changes: 28 additions & 12 deletions cachelib/allocator/CacheAllocator-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3419,7 +3419,17 @@ bool CacheAllocator<CacheTrait>::stopWorker(folly::StringPiece name,
std::unique_ptr<T>& worker,
std::chrono::seconds timeout) {
std::lock_guard<std::mutex> l(workersMutex_);
auto ret = util::stopPeriodicWorker(name, worker, timeout);
if (!worker) {
return true;
}

bool ret = worker->stop(timeout);
if (ret) {
XLOGF(DBG1, "Stopped worker '{}'", name);
} else {
XLOGF(ERR, "Couldn't stop worker '{}', timeout: {} seconds", name,
timeout.count());
}
worker.reset();
return ret;
}
Expand All @@ -3431,22 +3441,29 @@ bool CacheAllocator<CacheTrait>::startNewWorker(
std::unique_ptr<T>& worker,
std::chrono::milliseconds interval,
Args&&... args) {
if (worker && !stopWorker(name, worker)) {
if (!stopWorker(name, worker)) {
return false;
}

std::lock_guard<std::mutex> l(workersMutex_);
return util::startPeriodicWorker(name, worker, interval,
std::forward<Args>(args)...);
worker = std::make_unique<T>(*this, std::forward<Args>(args)...);
bool ret = worker->start(interval, name);
if (ret) {
XLOGF(DBG1, "Started worker '{}'", name);
} else {
XLOGF(ERR, "Couldn't start worker '{}', interval: {} milliseconds", name,
interval.count());
}
return ret;
}

template <typename CacheTrait>
bool CacheAllocator<CacheTrait>::startNewPoolRebalancer(
std::chrono::milliseconds interval,
std::shared_ptr<RebalanceStrategy> strategy,
unsigned int freeAllocThreshold) {
if (!startNewWorker("PoolRebalancer", poolRebalancer_, interval, *this,
strategy, freeAllocThreshold)) {
if (!startNewWorker("PoolRebalancer", poolRebalancer_, interval, strategy,
freeAllocThreshold)) {
return false;
}

Expand All @@ -3462,7 +3479,7 @@ bool CacheAllocator<CacheTrait>::startNewPoolResizer(
std::chrono::milliseconds interval,
unsigned int poolResizeSlabsPerIter,
std::shared_ptr<RebalanceStrategy> strategy) {
if (!startNewWorker("PoolResizer", poolResizer_, interval, *this,
if (!startNewWorker("PoolResizer", poolResizer_, interval,
poolResizeSlabsPerIter, strategy)) {
return false;
}
Expand All @@ -3483,8 +3500,8 @@ bool CacheAllocator<CacheTrait>::startNewPoolOptimizer(
// it should do actual size optimization. Probably need to move to using
// the same interval for both, with confirmation of further experiments.
const auto workerInterval = std::chrono::seconds(1);
if (!startNewWorker("PoolOptimizer", poolOptimizer_, workerInterval, *this,
strategy, regularInterval.count(), ccacheInterval.count(),
if (!startNewWorker("PoolOptimizer", poolOptimizer_, workerInterval, strategy,
regularInterval.count(), ccacheInterval.count(),
ccacheStepSizePercent)) {
return false;
}
Expand All @@ -3502,7 +3519,7 @@ bool CacheAllocator<CacheTrait>::startNewMemMonitor(
std::chrono::milliseconds interval,
MemoryMonitor::Config config,
std::shared_ptr<RebalanceStrategy> strategy) {
if (!startNewWorker("MemoryMonitor", memMonitor_, interval, *this, config,
if (!startNewWorker("MemoryMonitor", memMonitor_, interval, config,
strategy)) {
return false;
}
Expand All @@ -3517,8 +3534,7 @@ template <typename CacheTrait>
bool CacheAllocator<CacheTrait>::startNewReaper(
std::chrono::milliseconds interval,
util::Throttler::Config reaperThrottleConfig) {
if (!startNewWorker("Reaper", reaper_, interval, *this,
reaperThrottleConfig)) {
if (!startNewWorker("Reaper", reaper_, interval, reaperThrottleConfig)) {
return false;
}

Expand Down
58 changes: 0 additions & 58 deletions cachelib/common/PeriodicWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include <folly/Range.h>
#include <folly/logging/xlog.h>

#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -141,62 +140,5 @@ class PeriodicWorker {
/* The main worker loop that handles the work periodically */
void loop(void);
};

namespace util {
// Stop a periodic worker
//
// @param name name of the worker
// @param worker unique pointer of the worker to stop
// @param timeout timeout for the worker stopping
// @return true if size controller has been successfully stopped
template <typename WorkerT>
bool stopPeriodicWorker(folly::StringPiece name,
std::unique_ptr<WorkerT>& worker,
std::chrono::seconds timeout = std::chrono::seconds{
0}) {
if (!worker) {
XLOGF(INFO, "Worker '{}' has not been started. No need to stop.", name);
return true;
}

bool ret = worker->stop(timeout);
if (ret) {
XLOGF(INFO, "Stopped worker '{}'", name);
} else {
XLOGF(ERR, "Couldn't stop worker '{}', timeout: {} seconds", name,
timeout.count());
}
return ret;
}

// Start a periodic worker
//
// @param name name of the worker
// @param worker unique pointer of the worker to start
// @param interval the period this worker fires
// @param args... the rest of the arguments to initialize the worker
// @return true if the worker has been successfully started
template <typename WorkerT, typename... Args>
bool startPeriodicWorker(folly::StringPiece name,
std::unique_ptr<WorkerT>& worker,
std::chrono::milliseconds interval,
Args&&... args) {
if (worker && !stopPeriodicWorker(name, worker)) {
XLOGF(ERR, "Couldn't restart worker '{}' because it couldn't be stopped",
name);
return false;
}
worker = std::make_unique<WorkerT>(std::forward<Args>(args)...);
bool ret = worker->start(interval, name);
if (ret) {
XLOGF(INFO, "Started worker '{}'", name);
} else {
XLOGF(ERR, "Couldn't start worker '{}', interval: {} milliseconds", name,
interval.count());
}
return ret;
}

} // namespace util
} // namespace cachelib
} // namespace facebook
54 changes: 47 additions & 7 deletions cachelib/experimental/objcache2/ObjectCache-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,15 @@ template <typename AllocatorT>
void ObjectCache<AllocatorT>::initWorkers() {
if (config_.objectSizeTrackingEnabled &&
config_.sizeControllerIntervalMs != 0) {
util::startPeriodicWorker(
kSizeControllerName, sizeController_,
std::chrono::milliseconds{config_.sizeControllerIntervalMs}, *this,
config_.sizeControllerThrottlerConfig);
startWorker(kSizeControllerName, sizeController_,
std::chrono::milliseconds{config_.sizeControllerIntervalMs},
config_.sizeControllerThrottlerConfig);
}

if (config_.objectSizeTrackingEnabled &&
config_.objectSizeDistributionTrackingEnabled) {
util::startPeriodicWorker(
kSizeDistTrackerName, sizeDistTracker_,
std::chrono::seconds{60} /*default interval to be 60s*/, *this);
startWorker(kSizeDistTrackerName, sizeDistTracker_,
std::chrono::seconds{60} /*default interval to be 60s*/);
}
}

Expand Down Expand Up @@ -373,6 +371,48 @@ ObjectCache<AllocatorT>::serializeConfigParams() const {
return config;
}

template <typename AllocatorT>
template <typename WorkerT, typename... Args>
bool ObjectCache<AllocatorT>::startWorker(folly::StringPiece name,
std::unique_ptr<WorkerT>& worker,
std::chrono::milliseconds interval,
Args&&... args) {
if (!stopWorker(name, worker)) {
XLOGF(ERR, "Worker '{}' is already running. Cannot start it again.", name);
return false;
}

worker = std::make_unique<WorkerT>(*this, std::forward<Args>(args)...);
bool ret = worker->start(interval, name);
if (ret) {
XLOGF(INFO, "Started worker '{}'", name);
} else {
XLOGF(ERR, "Couldn't start worker '{}', interval: {} milliseconds", name,
interval.count());
}
return ret;
}

template <typename AllocatorT>
template <typename WorkerT>
bool ObjectCache<AllocatorT>::stopWorker(folly::StringPiece name,
std::unique_ptr<WorkerT>& worker,
std::chrono::seconds timeout) {
if (!worker) {
XLOGF(INFO, "Worker '{}' has not been started. No need to stop.", name);
return true;
}

bool ret = worker->stop(timeout);
if (ret) {
XLOGF(INFO, "Stopped worker '{}'", name);
} else {
XLOGF(ERR, "Couldn't stop worker '{}', timeout: {} seconds", name,
timeout.count());
}
return ret;
}

template <typename AllocatorT>
bool ObjectCache<AllocatorT>::persist() {
if (config_.persistBaseFilePath.empty() || !config_.serializeCb) {
Expand Down
30 changes: 26 additions & 4 deletions cachelib/experimental/objcache2/ObjectCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,8 @@ class ObjectCache : public ObjectCacheBase<AllocatorT> {
// @return true if all workers have been successfully stopped
bool stopAllWorkers(std::chrono::seconds timeout = std::chrono::seconds{0}) {
bool success = true;
success &=
util::stopPeriodicWorker(kSizeControllerName, sizeController_, timeout);
success &= util::stopPeriodicWorker(
kSizeDistTrackerName, sizeDistTracker_, timeout);
success &= stopWorker(kSizeControllerName, sizeController_, timeout);
success &= stopWorker(kSizeDistTrackerName, sizeDistTracker_, timeout);
success &= this->l1Cache_->stopWorkers(timeout);
return success;
}
Expand Down Expand Up @@ -445,6 +443,30 @@ class ObjectCache : public ObjectCacheBase<AllocatorT> {
// Returns the total number of placeholders
size_t getNumPlaceholders() const { return placeholders_.size(); }

// Start a periodic worker
//
// @param name name of the worker
// @param worker unique pointer of the worker to start
// @param interval the period this worker fires
// @param args... the rest of the arguments to initialize the worker
// @return true if the worker has been successfully started
template <typename WorkerT, typename... Args>
bool startWorker(folly::StringPiece name,
std::unique_ptr<WorkerT>& worker,
std::chrono::milliseconds interval,
Args&&... args);

// Stop a periodic worker
//
// @param name name of the worker
// @param worker unique pointer of the worker to stop
// @param timeout timeout for the worker stopping
// @return true if size controller has been successfully stopped
template <typename WorkerT>
bool stopWorker(folly::StringPiece name,
std::unique_ptr<WorkerT>& worker,
std::chrono::seconds timeout = std::chrono::seconds{0});

// Get a ReadHandle reference from the object shared_ptr
template <typename T>
typename AllocatorT::ReadHandle& getReadHandleRefInternal(
Expand Down

0 comments on commit b74dec6

Please sign in to comment.