Skip to content

Commit

Permalink
Deprecate transfer capacity for shared arbitrator (#11084)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #11084

Reviewed By: xiaoxmeng

Differential Revision: D63472474

Pulled By: tanjialiang

fbshipit-source-id: 47041957eb41069f1435fa035a4f63bd1134c62c
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Sep 26, 2024
1 parent 7a9b141 commit ce2b907
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 77 deletions.
22 changes: 4 additions & 18 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,6 @@ uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(
config::CapacityUnit::BYTE);
}

uint64_t SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(
const std::unordered_map<std::string, std::string>& configs) {
return config::toCapacity(
getConfig<std::string>(
configs,
kMemoryPoolTransferCapacity,
std::string(kDefaultMemoryPoolTransferCapacity)),
config::CapacityUnit::BYTE);
}

uint64_t SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(
const std::unordered_map<std::string, std::string>& configs) {
return std::chrono::duration_cast<std::chrono::milliseconds>(
Expand Down Expand Up @@ -182,8 +172,6 @@ SharedArbitrator::SharedArbitrator(const Config& config)
ExtraConfig::getMemoryPoolInitialCapacity(config.extraConfigs)),
memoryPoolReservedCapacity_(
ExtraConfig::getMemoryPoolReservedCapacity(config.extraConfigs)),
memoryPoolTransferCapacity_(
ExtraConfig::getMemoryPoolTransferCapacity(config.extraConfigs)),
memoryReclaimWaitMs_(
ExtraConfig::getMemoryReclaimMaxWaitTimeMs(config.extraConfigs)),
globalArbitrationEnabled_(
Expand Down Expand Up @@ -542,7 +530,7 @@ uint64_t SharedArbitrator::getCapacityGrowthTarget(
const MemoryPool& pool,
uint64_t requestBytes) const {
if (fastExponentialGrowthCapacityLimit_ == 0 && slowCapacityGrowPct_ == 0) {
return std::max(requestBytes, memoryPoolTransferCapacity_);
return requestBytes;
}
uint64_t targetBytes{0};
const auto capacity = pool.capacity();
Expand All @@ -551,8 +539,7 @@ uint64_t SharedArbitrator::getCapacityGrowthTarget(
} else {
targetBytes = capacity * slowCapacityGrowPct_;
}
return std::max(
std::max(requestBytes, targetBytes), memoryPoolTransferCapacity_);
return std::max(requestBytes, targetBytes);
}

bool SharedArbitrator::growCapacity(MemoryPool* pool, uint64_t requestBytes) {
Expand Down Expand Up @@ -938,9 +925,8 @@ uint64_t SharedArbitrator::reclaim(
MemoryPool* pool,
uint64_t targetBytes,
bool isLocalArbitration) noexcept {
int64_t bytesToReclaim = std::min<uint64_t>(
std::max(targetBytes, memoryPoolTransferCapacity_),
maxReclaimableCapacity(*pool, true));
int64_t bytesToReclaim =
std::min<uint64_t>(targetBytes, maxReclaimableCapacity(*pool, true));
if (bytesToReclaim == 0) {
return 0;
}
Expand Down
16 changes: 1 addition & 15 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,6 @@ class SharedArbitrator : public memory::MemoryArbitrator {
static uint64_t getMemoryPoolReservedCapacity(
const std::unordered_map<std::string, std::string>& configs);

/// The minimal memory capacity to transfer out of or into a memory pool
/// during the memory arbitration.
static constexpr std::string_view kMemoryPoolTransferCapacity{
"memory-pool-transfer-capacity"};
static constexpr std::string_view kDefaultMemoryPoolTransferCapacity{
"128MB"};
static uint64_t getMemoryPoolTransferCapacity(
const std::unordered_map<std::string, std::string>& configs);

/// Specifies the max time to wait for memory reclaim by arbitration. The
/// memory reclaim might fail if the max time has exceeded. This prevents
/// the memory arbitration from getting stuck when the memory reclaim waits
Expand Down Expand Up @@ -231,11 +222,7 @@ class SharedArbitrator : public memory::MemoryArbitrator {
// The adjusted grow bytes based on 'requestBytes'. This 'targetBytes' is a
// best effort target, and hence will not be guaranteed. The adjustment is
// based on 'SharedArbitrator::fastExponentialGrowthCapacityLimit_'
// 'SharedArbitrator::slowCapacityGrowPct_' and
// 'MemoryArbitrator::memoryPoolTransferCapacity_'.
//
// TODO: deprecate 'MemoryArbitrator::memoryPoolTransferCapacity_' once
// exponential growth works well in production.
// 'SharedArbitrator::slowCapacityGrowPct_'
const std::optional<uint64_t> targetBytes;

// The start time of this arbitration operation.
Expand Down Expand Up @@ -516,7 +503,6 @@ class SharedArbitrator : public memory::MemoryArbitrator {
const uint64_t reservedCapacity_;
const uint64_t memoryPoolInitialCapacity_;
const uint64_t memoryPoolReservedCapacity_;
const uint64_t memoryPoolTransferCapacity_;
const uint64_t memoryReclaimWaitMs_;
const bool globalArbitrationEnabled_;
const bool checkUsageLeak_;
Expand Down
11 changes: 6 additions & 5 deletions velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) {
ASSERT_FALSE(manager.arbitrator()->growCapacity(rootPool.get(), 6 << 20));
ASSERT_EQ(rootPool->capacity(), 1 << 20);
ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 2 << 20));
ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 1 << 20));
ASSERT_EQ(rootPool->capacity(), 4 << 20);
ASSERT_EQ(manager.arbitrator()->stats().freeCapacityBytes, 2 << 20);
ASSERT_EQ(manager.arbitrator()->stats().freeReservedCapacityBytes, 2 << 20);
Expand All @@ -154,19 +155,19 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) {
"Exceeded memory pool capacity after attempt to grow capacity through "
"arbitration. Requestor pool name 'leaf-1.0', request size 7.00MB, "
"memory pool capacity 4.00MB, memory pool max capacity 8.00MB");
ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 1 << 20);
ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 0);
ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 0);
ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 1), 0);
ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 1), 0);
ASSERT_EQ(rootPool->capacity(), 3 << 20);
ASSERT_EQ(rootPool->capacity(), 4 << 20);
static_cast<MemoryPoolImpl*>(rootPool.get())->testingSetReservation(0);
ASSERT_EQ(
manager.arbitrator()->shrinkCapacity(leafPool.get(), 1 << 20), 1 << 20);
ASSERT_EQ(
manager.arbitrator()->shrinkCapacity(rootPool.get(), 1 << 20), 1 << 20);
ASSERT_EQ(rootPool->capacity(), 1 << 20);
ASSERT_EQ(leafPool->capacity(), 1 << 20);
ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 1 << 20);
ASSERT_EQ(rootPool->capacity(), 2 << 20);
ASSERT_EQ(leafPool->capacity(), 2 << 20);
ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 2 << 20);
ASSERT_EQ(rootPool->capacity(), 0);
ASSERT_EQ(leafPool->capacity(), 0);
}
Expand Down
38 changes: 5 additions & 33 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ constexpr int64_t MB = 1024L * KB;
constexpr uint64_t kMemoryCapacity = 512 * MB;
constexpr uint64_t kReservedMemoryCapacity = 128 * MB;
constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB;
// TODO(jtan6): Remove after complete transfer capacity deprecation
constexpr uint64_t kMemoryPoolTransferCapacity = 0;
constexpr uint64_t kMemoryPoolReservedCapacity = 8 * MB;
constexpr uint64_t kFastExponentialGrowthCapacityLimit = 32 * MB;
constexpr double kSlowCapacityGrowPct = 0.25;
Expand Down Expand Up @@ -429,7 +427,6 @@ class MockSharedArbitrationTest : public testing::Test {
int64_t reservedMemoryCapacity = kReservedMemoryCapacity,
uint64_t memoryPoolInitCapacity = kMemoryPoolInitCapacity,
uint64_t memoryPoolReserveCapacity = kMemoryPoolReservedCapacity,
uint64_t memoryPoolTransferCapacity = kMemoryPoolTransferCapacity,
uint64_t fastExponentialGrowthCapacityLimit =
kFastExponentialGrowthCapacityLimit,
double slowCapacityGrowPct = kSlowCapacityGrowPct,
Expand All @@ -450,8 +447,6 @@ class MockSharedArbitrationTest : public testing::Test {
folly::to<std::string>(memoryPoolInitCapacity) + "B"},
{std::string(ExtraConfig::kMemoryPoolReservedCapacity),
folly::to<std::string>(memoryPoolReserveCapacity) + "B"},
{std::string(ExtraConfig::kMemoryPoolTransferCapacity),
folly::to<std::string>(memoryPoolTransferCapacity) + "B"},
{std::string(ExtraConfig::kFastExponentialGrowthCapacityLimit),
folly::to<std::string>(fastExponentialGrowthCapacityLimit) + "B"},
{std::string(ExtraConfig::kSlowCapacityGrowPct),
Expand Down Expand Up @@ -555,10 +550,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolInitialCapacity(emptyConfigs),
256 << 20);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(
emptyConfigs),
128 << 20);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(
emptyConfigs),
Expand All @@ -578,8 +569,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = "512MB";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "200B";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = "256MB";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = "5000ms";
configs[std::string(
Expand All @@ -593,9 +582,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(configs),
200);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(configs),
256 << 20);
ASSERT_EQ(
SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(configs),
5000);
Expand All @@ -610,8 +596,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] = "invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolReservedCapacity)] = "invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryPoolTransferCapacity)] = "invalid";
configs[std::string(
SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] = "invalid";
configs[std::string(
Expand All @@ -627,9 +611,6 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) {
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getMemoryPoolReservedCapacity(configs),
"Invalid capacity string 'invalid'");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity(configs),
"Invalid capacity string 'invalid'");
VELOX_ASSERT_THROW(
SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs(configs),
"Invalid duration 'invalid'");
Expand Down Expand Up @@ -678,7 +659,7 @@ TEST_F(MockSharedArbitrationTest, arbitrationStateCheck) {
ASSERT_TRUE(RE2::FullMatch(pool.name(), re)) << pool.name();
++checkCount;
};
setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, 0, checkCountCb);
setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, checkCountCb);

const int numTasks{5};
std::vector<std::shared_ptr<MockTask>> tasks;
Expand All @@ -703,7 +684,7 @@ TEST_F(MockSharedArbitrationTest, arbitrationStateCheck) {
MemoryArbitrationStateCheckCB badCheckCb = [&](MemoryPool& /*unused*/) {
VELOX_FAIL("bad check");
};
setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, 0, badCheckCb);
setupMemory(memCapacity, 0, 0, 0, 0, 0, 0, 0, badCheckCb);
std::shared_ptr<MockTask> task = addTask(kMemoryCapacity);
ASSERT_EQ(task->capacity(), 0);
MockMemoryOperator* memOp = task->addMemoryOp();
Expand Down Expand Up @@ -1238,7 +1219,6 @@ DEBUG_ONLY_TEST_F(
0,
memoryPoolInitCapacity,
0,
0,
kFastExponentialGrowthCapacityLimit,
kSlowCapacityGrowPct,
0,
Expand Down Expand Up @@ -1331,7 +1311,6 @@ DEBUG_ONLY_TEST_F(
0,
memoryPoolInitCapacity,
0,
0,
kFastExponentialGrowthCapacityLimit,
kSlowCapacityGrowPct,
0,
Expand Down Expand Up @@ -1615,7 +1594,6 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, globalArbitrationEnableCheck) {
0,
memoryPoolInitCapacity,
0,
0,
kFastExponentialGrowthCapacityLimit,
kSlowCapacityGrowPct,
kMemoryPoolMinFreeCapacity,
Expand Down Expand Up @@ -1652,7 +1630,6 @@ DEBUG_ONLY_TEST_F(
0,
memoryPoolInitCapacity,
0,
0,
kFastExponentialGrowthCapacityLimit,
kSlowCapacityGrowPct,
0,
Expand Down Expand Up @@ -1871,7 +1848,6 @@ TEST_F(MockSharedArbitrationTest, singlePoolShrinkWithoutArbitration) {
0,
0,
0,
0,
testParam.memoryPoolMinFreeCapacity,
testParam.memoryPoolMinFreeCapacityPct),
"both need to be set (non-zero) at the same time to enable shrink "
Expand All @@ -1885,7 +1861,6 @@ TEST_F(MockSharedArbitrationTest, singlePoolShrinkWithoutArbitration) {
0,
0,
0,
0,
testParam.memoryPoolMinFreeCapacity,
testParam.memoryPoolMinFreeCapacityPct);
}
Expand Down Expand Up @@ -1931,7 +1906,6 @@ TEST_F(MockSharedArbitrationTest, singlePoolGrowWithoutArbitration) {
0,
memoryPoolInitCapacity,
0,
0,
testParam.fastExponentialGrowthCapacityLimit,
testParam.slowCapacityGrowPct);

Expand Down Expand Up @@ -2146,7 +2120,6 @@ TEST_F(MockSharedArbitrationTest, ensureMemoryPoolMaxCapacity) {
0,
poolInitCapacity,
0,
0,
kFastExponentialGrowthCapacityLimit,
kSlowCapacityGrowPct,
0,
Expand Down Expand Up @@ -2681,7 +2654,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, failedToReclaimFromRequestor) {
0}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
setupMemory(kMemoryCapacity, 0, kMemoryPoolInitCapacity, 0, 0, 0, 0, 0, 0);
setupMemory(kMemoryCapacity, 0, kMemoryPoolInitCapacity, 0, 0, 0, 0, 0);

std::vector<std::shared_ptr<MockTask>> otherTasks;
std::vector<MockMemoryOperator*> otherTaskOps;
Expand Down Expand Up @@ -2865,7 +2838,7 @@ DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, failedToReclaimFromOtherTask) {
nonFailTaskMemoryCapacity}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
setupMemory(kMemoryCapacity, 0, kMemoryPoolInitCapacity, 0, 0, 0, 0, 0, 0);
setupMemory(kMemoryCapacity, 0, kMemoryPoolInitCapacity, 0, 0, 0, 0, 0);

std::vector<std::shared_ptr<MockTask>> nonFailedTasks;
std::vector<MockMemoryOperator*> nonFailedTaskOps;
Expand Down Expand Up @@ -3015,7 +2988,6 @@ TEST_F(MockSharedArbitrationTest, memoryPoolAbortThrow) {
0,
kMemoryPoolInitCapacity,
0,
0,
kFastExponentialGrowthCapacityLimit,
kSlowCapacityGrowPct,
0,
Expand Down Expand Up @@ -3065,7 +3037,7 @@ TEST_F(MockSharedArbitrationTest, memoryPoolAbortThrow) {

// This test makes sure the memory capacity grows as expected.
DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, concurrentArbitrationRequests) {
setupMemory(kMemoryCapacity, 0, 0, 0, 128 << 20);
setupMemory(kMemoryCapacity, 0, 0, 0);
std::shared_ptr<MockTask> task = addTask();
MockMemoryOperator* op1 = addMemoryOp(task);
MockMemoryOperator* op2 = addMemoryOp(task);
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7506,7 +7506,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, taskWaitTimeout) {

for (uint64_t timeoutMs : {0, 1'000, 30'000}) {
SCOPED_TRACE(fmt::format("timeout {}", succinctMillis(timeoutMs)));
auto memoryManager = createMemoryManager(512 << 20, 0, 0, timeoutMs);
auto memoryManager = createMemoryManager(512 << 20, 0, timeoutMs);
auto queryCtx =
newQueryCtx(memoryManager.get(), executor_.get(), queryMemoryCapacity);

Expand Down
3 changes: 0 additions & 3 deletions velox/exec/tests/utils/ArbitratorTestUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ std::shared_ptr<core::QueryCtx> newQueryCtx(
std::unique_ptr<memory::MemoryManager> createMemoryManager(
int64_t arbitratorCapacity,
uint64_t memoryPoolInitCapacity,
uint64_t memoryPoolTransferCapacity,
uint64_t maxReclaimWaitMs,
uint64_t fastExponentialGrowthCapacityLimit,
double slowCapacityGrowPct) {
Expand All @@ -59,8 +58,6 @@ std::unique_ptr<memory::MemoryManager> createMemoryManager(
options.extraArbitratorConfigs = {
{std::string(ExtraConfig::kMemoryPoolInitialCapacity),
folly::to<std::string>(memoryPoolInitCapacity) + "B"},
{std::string(ExtraConfig::kMemoryPoolTransferCapacity),
folly::to<std::string>(memoryPoolTransferCapacity) + "B"},
{std::string(ExtraConfig::kMemoryReclaimMaxWaitTime),
folly::to<std::string>(maxReclaimWaitMs) + "ms"},
{std::string(ExtraConfig::kGlobalArbitrationEnabled), "true"},
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/tests/utils/ArbitratorTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ constexpr int64_t MB = 1024L * KB;

constexpr uint64_t kMemoryCapacity = 512 * MB;
constexpr uint64_t kMemoryPoolInitCapacity = 16 * MB;
constexpr uint64_t kMemoryPoolTransferCapacity = 8 * MB;

class FakeMemoryReclaimer : public exec::MemoryReclaimer {
public:
Expand Down Expand Up @@ -95,7 +94,6 @@ std::shared_ptr<core::QueryCtx> newQueryCtx(
std::unique_ptr<memory::MemoryManager> createMemoryManager(
int64_t arbitratorCapacity = kMemoryCapacity,
uint64_t memoryPoolInitCapacity = kMemoryPoolInitCapacity,
uint64_t memoryPoolTransferCapacity = kMemoryPoolTransferCapacity,
uint64_t maxReclaimWaitMs = 0,
uint64_t fastExponentialGrowthCapacityLimit = 0,
double slowCapacityGrowPct = 0);
Expand Down

0 comments on commit ce2b907

Please sign in to comment.