Skip to content

Commit

Permalink
fix: Reuse vector in LocalPartition (#12002)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #12002

X-link: facebookincubator/nimble#122

More than 10% of the CPU are spent on the destruction of local partition output when the load is high.

Also add some optimizations for serialization.  Optimization on `ByteOutputStream::appendBool` does not show significant gain in the query in example (because they are a lot small batches), but it is net gain and would be significant in large batches, so I leave it in the code.

Reviewed By: xiaoxmeng

Differential Revision: D67742489

fbshipit-source-id: 8e70dd128f31caa7909ed7c1e2b4ac1e59d7c87d
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jan 10, 2025
1 parent ec2740f commit 9dcfd39
Show file tree
Hide file tree
Showing 14 changed files with 244 additions and 134 deletions.
30 changes: 0 additions & 30 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,36 +197,6 @@ size_t ByteOutputStream::size() const {
return total + std::max(ranges_.back().position, lastRangeEnd_);
}

void ByteOutputStream::appendBool(bool value, int32_t count) {
VELOX_DCHECK(isBits_);

if (count == 1 && current_->size > current_->position) {
bits::setBit(
reinterpret_cast<uint64_t*>(current_->buffer),
current_->position,
value);
++current_->position;
return;
}

int32_t offset{0};
for (;;) {
const int32_t bitsFit =
std::min(count - offset, current_->size - current_->position);
bits::fillBits(
reinterpret_cast<uint64_t*>(current_->buffer),
current_->position,
current_->position + bitsFit,
value);
current_->position += bitsFit;
offset += bitsFit;
if (offset == count) {
return;
}
extend(bits::nbytes(count - offset));
}
}

void ByteOutputStream::appendBits(
const uint64_t* bits,
int32_t begin,
Expand Down
30 changes: 29 additions & 1 deletion velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,35 @@ class ByteOutputStream {
current_->position += sizeof(T) * values.size();
}

void appendBool(bool value, int32_t count);
inline void appendBool(bool value, int32_t count) {
VELOX_DCHECK(isBits_);

if (count == 1 && current_->size > current_->position) {
bits::setBit(
reinterpret_cast<uint64_t*>(current_->buffer),
current_->position,
value);
++current_->position;
return;
}

int32_t offset{0};
for (;;) {
const int32_t bitsFit =
std::min(count - offset, current_->size - current_->position);
bits::fillBits(
reinterpret_cast<uint64_t*>(current_->buffer),
current_->position,
current_->position + bitsFit,
value);
current_->position += bitsFit;
offset += bitsFit;
if (offset == count) {
return;
}
extend(bits::nbytes(count - offset));
}
}

// A fast path for appending bits into pre-cleared buffers after first extend.
inline void
Expand Down
88 changes: 62 additions & 26 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ std::vector<ContinuePromise> LocalExchangeMemoryManager::decreaseMemoryUsage(
return promises;
}

void LocalExchangeVectorPool::push(const RowVectorPtr& vector, int64_t size) {
pool_.withWLock([&](auto& pool) {
if (totalSize_ + size <= capacity_) {
pool.emplace(vector, size);
totalSize_ += size;
}
});
}

RowVectorPtr LocalExchangeVectorPool::pop() {
return pool_.withWLock([&](auto& pool) -> RowVectorPtr {
while (!pool.empty()) {
auto [vector, size] = std::move(pool.front());
pool.pop();
totalSize_ -= size;
VELOX_CHECK_GE(totalSize_, 0);
if (vector.use_count() == 1) {
return vector;
}
}
VELOX_CHECK_EQ(totalSize_, 0);
return nullptr;
});
}

void LocalExchangeQueue::addProducer() {
queue_.withWLock([&](auto& /*queue*/) {
VELOX_CHECK(!noMoreProducers_, "addProducer called after noMoreProducers");
Expand Down Expand Up @@ -124,6 +149,7 @@ BlockingReason LocalExchangeQueue::next(
ContinueFuture* future,
memory::MemoryPool* pool,
RowVectorPtr* data) {
int64_t size;
std::vector<ContinuePromise> memoryPromises;
auto blockingReason = queue_.withWLock([&](auto& queue) {
*data = nullptr;
Expand All @@ -138,15 +164,17 @@ BlockingReason LocalExchangeQueue::next(
return BlockingReason::kWaitForProducer;
}

int64_t size;
std::tie(*data, size) = queue.front();
std::tie(*data, size) = std::move(queue.front());
queue.pop();

memoryPromises = memoryManager_->decreaseMemoryUsage(size);

return BlockingReason::kNotBlocked;
});
notify(memoryPromises);
if (*data != nullptr) {
vectorPool_->push(*data, size);
}
return blockingReason;
}

Expand Down Expand Up @@ -289,34 +317,44 @@ void LocalPartition::allocateIndexBuffers(
RowVectorPtr LocalPartition::wrapChildren(
const RowVectorPtr& input,
vector_size_t size,
BufferPtr indices) {
VELOX_CHECK_EQ(childVectors_.size(), input->type()->size());
const BufferPtr& indices,
RowVectorPtr reusable) {
if (!reusable) {
reusable = std::make_shared<RowVector>(
pool(),
input->type(),
nullptr,
size,
std::vector<VectorPtr>(input->childrenSize()));
} else {
VELOX_CHECK(!reusable->mayHaveNulls());
VELOX_CHECK_EQ(reusable.use_count(), 1);
reusable->unsafeResize(size);
}

for (auto i = 0; i < input->type()->size(); ++i) {
childVectors_[i] = BaseVector::wrapInDictionary(
BufferPtr(nullptr), indices, size, input->childAt(i));
for (auto i = 0; i < input->childrenSize(); ++i) {
auto& child = reusable->childAt(i);
if (child && child->encoding() == VectorEncoding::Simple::DICTIONARY &&
child.use_count() == 1) {
child->BaseVector::resize(size);
child->setWrapInfo(indices);
child->setValueVector(input->childAt(i));
} else {
child = BaseVector::wrapInDictionary(
nullptr, indices, size, input->childAt(i));
}
}

return std::make_shared<RowVector>(
input->pool(), input->type(), BufferPtr(nullptr), size, childVectors_);
reusable->updateContainsLazyNotLoaded();
return reusable;
}

void LocalPartition::addInput(RowVectorPtr input) {
prepareForInput(input);

if (numPartitions_ == 1) {
ContinueFuture future;
auto blockingReason =
queues_[0]->enqueue(input, input->retainedSize(), &future);
if (blockingReason != BlockingReason::kNotBlocked) {
blockingReasons_.push_back(blockingReason);
futures_.push_back(std::move(future));
}
return;
}

const auto singlePartition =
partitionFunction_->partition(*input, partitions_);
const auto singlePartition = numPartitions_ == 1
? 0
: partitionFunction_->partition(*input, partitions_);
if (singlePartition.has_value()) {
ContinueFuture future;
auto blockingReason = queues_[singlePartition.value()]->enqueue(
Expand Down Expand Up @@ -349,7 +387,8 @@ void LocalPartition::addInput(RowVectorPtr input) {
// Do not enqueue empty partitions.
continue;
}
auto partitionData = wrapChildren(input, partitionSize, indexBuffers_[i]);
auto partitionData = wrapChildren(
input, partitionSize, indexBuffers_[i], queues_[i]->getVector());
ContinueFuture future;
auto reason = queues_[i]->enqueue(
partitionData, totalSize * partitionSize / numInput, &future);
Expand All @@ -371,9 +410,6 @@ void LocalPartition::prepareForInput(RowVectorPtr& input) {
for (auto& child : input->children()) {
child->loadedVector();
}
if (childVectors_.empty()) {
childVectors_.resize(input->type()->size());
}
}

BlockingReason LocalPartition::isBlocked(ContinueFuture* future) {
Expand Down
34 changes: 31 additions & 3 deletions velox/exec/LocalPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ class LocalExchangeMemoryManager {
std::vector<ContinuePromise> promises_;
};

/// A vector pool to reuse the RowVector and DictionaryVectors. Only
/// exclusively owned vectors will be reused.
class LocalExchangeVectorPool {
public:
explicit LocalExchangeVectorPool(int64_t capacity) : capacity_(capacity) {}

/// `size' is the estimated size of the `vector' (e.g. taking shared
/// dictionary into consideration).
void push(const RowVectorPtr& vector, int64_t size);

RowVectorPtr pop();

private:
const int64_t capacity_;
int64_t totalSize_{0};
folly::Synchronized<std::queue<std::pair<RowVectorPtr, int64_t>>> pool_;
};

/// Buffers data for a single partition produced by local exchange. Allows
/// multiple producers to enqueue data and multiple consumers fetch data. Each
/// producer must be registered with a call to 'addProducer'. 'noMoreProducers'
Expand All @@ -63,8 +81,11 @@ class LocalExchangeQueue {
public:
LocalExchangeQueue(
std::shared_ptr<LocalExchangeMemoryManager> memoryManager,
std::shared_ptr<LocalExchangeVectorPool> vectorPool,
int partition)
: memoryManager_{std::move(memoryManager)}, partition_{partition} {}
: memoryManager_{std::move(memoryManager)},
vectorPool_{std::move(vectorPool)},
partition_{partition} {}

std::string toString() const {
return fmt::format("LocalExchangeQueue({})", partition_);
Expand Down Expand Up @@ -99,6 +120,12 @@ class LocalExchangeQueue {
/// called before all the data has been processed. No-op otherwise.
void close();

/// Get a reusable vector from the vector pool. Return nullptr if none is
/// available.
RowVectorPtr getVector() {
return vectorPool_->pop();
}

/// Returns true if all producers have sent no more data signal.
bool testingProducersDone() const;

Expand All @@ -108,6 +135,7 @@ class LocalExchangeQueue {
bool isFinishedLocked(const Queue& queue) const;

const std::shared_ptr<LocalExchangeMemoryManager> memoryManager_;
const std::shared_ptr<LocalExchangeVectorPool> vectorPool_;
const int partition_;

folly::Synchronized<Queue> queue_;
Expand Down Expand Up @@ -196,7 +224,8 @@ class LocalPartition : public Operator {
RowVectorPtr wrapChildren(
const RowVectorPtr& input,
vector_size_t size,
BufferPtr indices);
const BufferPtr& indices,
RowVectorPtr reusable);

const std::vector<std::shared_ptr<LocalExchangeQueue>> queues_;
const size_t numPartitions_;
Expand All @@ -210,7 +239,6 @@ class LocalPartition : public Operator {
/// Reusable buffers for input partitioning.
std::vector<BufferPtr> indexBuffers_;
std::vector<vector_size_t*> rawIndices_;
std::vector<VectorPtr> childVectors_;
};

} // namespace facebook::velox::exec
3 changes: 2 additions & 1 deletion velox/exec/ScaleWriterLocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ void ScaleWriterPartitioningLocalPartition::addInput(RowVectorPtr input) {
auto writerInput = wrapChildren(
input,
writerRowCount,
std::move(writerAssignmmentIndicesBuffers_[i]));
std::move(writerAssignmmentIndicesBuffers_[i]),
queues_[i]->getVector());
ContinueFuture future;
auto reason = queues_[i]->enqueue(
writerInput, totalInputBytes * writerRowCount / numInput, &future);
Expand Down
7 changes: 4 additions & 3 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2609,11 +2609,12 @@ void Task::createLocalExchangeQueuesLocked(
LocalExchangeState exchange;
exchange.memoryManager = std::make_shared<LocalExchangeMemoryManager>(
queryCtx_->queryConfig().maxLocalExchangeBufferSize());

exchange.vectorPool = std::make_shared<LocalExchangeVectorPool>(
queryCtx_->queryConfig().maxLocalExchangeBufferSize());
exchange.queues.reserve(numPartitions);
for (auto i = 0; i < numPartitions; ++i) {
exchange.queues.emplace_back(
std::make_shared<LocalExchangeQueue>(exchange.memoryManager, i));
exchange.queues.emplace_back(std::make_shared<LocalExchangeQueue>(
exchange.memoryManager, exchange.vectorPool, i));
}

const auto partitionNode =
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/TaskStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ struct SplitsState {
/// Stores local exchange queues with the memory manager.
struct LocalExchangeState {
std::shared_ptr<LocalExchangeMemoryManager> memoryManager;
std::shared_ptr<LocalExchangeVectorPool> vectorPool;
std::vector<std::shared_ptr<LocalExchangeQueue>> queues;
std::shared_ptr<common::SkewedPartitionRebalancer>
scaleWriterPartitionBalancer;
Expand Down Expand Up @@ -132,7 +133,7 @@ struct SplitGroupState {
uint32_t numFinishedOutputDrivers{0};

// True if the state contains structures used for connecting ungrouped
// execution pipeline with grouped excution pipeline. In that case we don't
// execution pipeline with grouped execution pipeline. In that case we don't
// want to clean up some of these structures.
bool mixedExecutionMode{false};

Expand Down
31 changes: 28 additions & 3 deletions velox/exec/tests/LocalPartitionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;
namespace facebook::velox::exec::test {
namespace {

class LocalPartitionTest : public HiveConnectorTestBase {
protected:
Expand Down Expand Up @@ -928,3 +927,29 @@ TEST_F(
tasks[0]->requestAbort().wait();
thread.join();
}

TEST_F(LocalPartitionTest, vectorPool) {
LocalExchangeVectorPool vectorPool(10);
std::vector<RowVector*> vectors;
auto makeVector = [&] {
auto vector =
BaseVector::create<RowVector>(ROW({"c0"}, {BIGINT()}), 1, pool());
vectors.push_back(vector.get());
return vector;
};
vectorPool.push(makeVector(), 5);
auto multiReferenced = makeVector();
vectorPool.push(multiReferenced, 2);
vectorPool.push(makeVector(), 3);
vectorPool.push(makeVector(), 1);
auto vector = vectorPool.pop();
ASSERT_TRUE(vector != nullptr);
ASSERT_EQ(vector.get(), vectors[0]);
vector = vectorPool.pop();
ASSERT_TRUE(vector != nullptr);
ASSERT_EQ(vector.get(), vectors[2]);
ASSERT_FALSE(vectorPool.pop());
}

} // namespace
} // namespace facebook::velox::exec::test
Loading

0 comments on commit 9dcfd39

Please sign in to comment.