Skip to content

Commit

Permalink
fix: Reuse vector in LocalPartition
Browse files Browse the repository at this point in the history
Summary:
X-link: facebookincubator/nimble#122

More than 10% of the CPU are spent on the destruction of local partition output when the load is high.

Also add some optimizations for serialization.  Optimization on `ByteOutputStream::appendOneBool` does not show significant gain in the query in example (because they are a lot small batches), but it is net gain and would be significant in large batches, so I leave it in the code.

Differential Revision: D67742489
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jan 2, 2025
1 parent cd6431a commit d4fbc7e
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 171 deletions.
104 changes: 54 additions & 50 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,31 +194,30 @@ size_t ByteOutputStream::size() const {
for (auto i = 0; i < ranges_.size() - 1; ++i) {
total += ranges_[i].size;
}
return total + std::max(ranges_.back().position, lastRangeEnd_);
updateEnd();
return total + lastRangeEnd_;
}

void ByteOutputStream::appendBool(bool value, int32_t count) {
VELOX_DCHECK(isBits_);

if (count == 1 && current_->size > current_->position) {
if (count == 1 && current_.size > current_.position) {
bits::setBit(
reinterpret_cast<uint64_t*>(current_->buffer),
current_->position,
value);
++current_->position;
reinterpret_cast<uint64_t*>(current_.buffer), current_.position, value);
++current_.position;
return;
}

int32_t offset{0};
for (;;) {
const int32_t bitsFit =
std::min(count - offset, current_->size - current_->position);
std::min(count - offset, current_.size - current_.position);
bits::fillBits(
reinterpret_cast<uint64_t*>(current_->buffer),
current_->position,
current_->position + bitsFit,
reinterpret_cast<uint64_t*>(current_.buffer),
current_.position,
current_.position + bitsFit,
value);
current_->position += bitsFit;
current_.position += bitsFit;
offset += bitsFit;
if (offset == count) {
return;
Expand All @@ -237,15 +236,15 @@ void ByteOutputStream::appendBits(
int32_t offset = 0;
for (;;) {
const int32_t bitsFit =
std::min(count - offset, current_->size - current_->position);
std::min(count - offset, current_.size - current_.position);
bits::copyBits(
bits,
begin + offset,
reinterpret_cast<uint64_t*>(current_->buffer),
current_->position,
reinterpret_cast<uint64_t*>(current_.buffer),
current_.position,
bitsFit);

current_->position += bitsFit;
current_.position += bitsFit;
offset += bitsFit;
if (offset == count) {
return;
Expand All @@ -263,10 +262,10 @@ void ByteOutputStream::appendStringView(std::string_view value) {
int32_t offset = 0;
for (;;) {
const int32_t bytesFit =
std::min(bytes - offset, current_->size - current_->position);
std::min(bytes - offset, current_.size - current_.position);
simd::memcpy(
current_->buffer + current_->position, value.data() + offset, bytesFit);
current_->position += bytesFit;
current_.buffer + current_.position, value.data() + offset, bytesFit);
current_.position += bytesFit;
offset += bytesFit;
if (offset == bytes) {
return;
Expand All @@ -279,13 +278,13 @@ std::streampos ByteOutputStream::tellp() const {
if (ranges_.empty()) {
return 0;
}
assert(current_);
VELOX_DCHECK_NOT_NULL(current_.buffer);
int64_t size = 0;
for (auto& range : ranges_) {
if (&range == current_) {
return current_->position + size;
for (int i = 0; i < ranges_.size(); ++i) {
if (i == currentIndex_) {
return current_.position + size;
}
size += range.size;
size += ranges_[i].size;
}
VELOX_FAIL("ByteOutputStream 'current_' is not in 'ranges_'.");
}
Expand All @@ -297,13 +296,14 @@ void ByteOutputStream::seekp(std::streampos position) {
if (ranges_.empty() && position == 0) {
return;
}
for (auto& range : ranges_) {
if (toSkip <= range.size) {
current_ = &range;
current_->position = toSkip;
for (int i = 0; i < ranges_.size(); ++i) {
if (toSkip <= ranges_[i].size) {
current_ = ranges_[i];
currentIndex_ = i;
current_.position = toSkip;
return;
}
toSkip -= range.size;
toSkip -= ranges_[i].size;
}
static_assert(sizeof(std::streamsize) <= sizeof(long long));
VELOX_FAIL(
Expand All @@ -330,40 +330,42 @@ char* ByteOutputStream::writePosition() {
if (ranges_.empty()) {
return nullptr;
}
return reinterpret_cast<char*>(current_->buffer) + current_->position;
return reinterpret_cast<char*>(current_.buffer) + current_.position;
}

void ByteOutputStream::extend(int32_t bytes) {
if (current_ && current_->position != current_->size) {
if (current_.buffer && current_.position != current_.size) {
LOG(FATAL) << "Extend ByteOutputStream before range full: "
<< current_->position << " vs. " << current_->size;
<< current_.position << " vs. " << current_.size;
}

// Check if rewriting existing content. If so, move to next range and start at
// 0.
if ((current_ != nullptr) && (current_ != &ranges_.back())) {
++current_;
current_->position = 0;
if (!ranges_.empty() && currentIndex_ < ranges_.size() - 1) {
current_ = ranges_[++currentIndex_];
current_.position = 0;
return;
}

ranges_.emplace_back();
current_ = &ranges_.back();
current_ = {};
currentIndex_ = ranges_.size();
lastRangeEnd_ = 0;
if (bytes == 0) {
// Only initialize, do not allocate if bytes is 0.
ranges_.push_back(current_);
return;
}
arena_->newRange(
newRangeSize(bytes),
ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2],
current_);
allocatedBytes_ += current_->size;
ranges_.empty() ? nullptr : &ranges_.back(),
&current_);
allocatedBytes_ += current_.size;
VELOX_CHECK_GT(allocatedBytes_, 0);
if (isBits_) {
// size and position are in units of bits for a bits stream.
current_->size *= 8;
current_.size *= 8;
}
ranges_.push_back(current_);
}

int32_t ByteOutputStream::newRangeSize(int32_t bytes) const {
Expand All @@ -381,18 +383,19 @@ int32_t ByteOutputStream::newRangeSize(int32_t bytes) const {
}

void ByteOutputStream::ensureSpace(int32_t bytes) {
const auto available = current_->size - current_->position;
const auto available = current_.size - current_.position;
int64_t toExtend = bytes - available;
const auto originalRangeIdx = current_ - ranges_.data();
const auto originalPosition = current_->position;
const auto originalRangeIdx = currentIndex_;
const auto originalPosition = current_.position;
while (toExtend > 0) {
current_->position = current_->size;
current_.position = current_.size;
extend(toExtend);
toExtend -= current_->size;
toExtend -= current_.size;
}
// Restore original position.
current_ = &ranges_[originalRangeIdx];
current_->position = originalPosition;
current_ = ranges_[originalRangeIdx];
currentIndex_ = originalRangeIdx;
current_.position = originalPosition;
}

std::unique_ptr<ByteInputStream> ByteOutputStream::inputStream() const {
Expand All @@ -406,10 +409,11 @@ std::unique_ptr<ByteInputStream> ByteOutputStream::inputStream() const {
std::string ByteOutputStream::toString() const {
std::stringstream oss;
oss << "ByteOutputStream[lastRangeEnd " << lastRangeEnd_ << ", "
<< ranges_.size() << " ranges (position/size) [";
<< " currentIndex " << currentIndex_ << ", " << " currentPosition "
<< current_.position << ", " << ranges_.size()
<< " ranges (position/size) [";
for (const auto& range : ranges_) {
oss << "(" << range.position << "/" << range.size
<< (&range == current_ ? " current" : "") << ")";
oss << "(" << range.position << "/" << range.size << ")";
if (&range != &ranges_.back()) {
oss << ",";
}
Expand Down
61 changes: 41 additions & 20 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ class ByteOutputStream {
void setRange(ByteRange range, int32_t lastWrittenPosition) {
ranges_.resize(1);
ranges_[0] = range;
current_ = ranges_.data();
currentIndex_ = 0;
current_ = range;
VELOX_CHECK_GE(ranges_.back().size, lastWrittenPosition);
lastRangeEnd_ = lastWrittenPosition;
}
Expand All @@ -268,14 +269,16 @@ class ByteOutputStream {
ranges_.clear();
isReversed_ = false;
allocatedBytes_ = 0;
current_ = nullptr;
currentIndex_ = -1;
current_.buffer = nullptr;
lastRangeEnd_ = 0;
extend(initialSize);
}

void seek(int32_t range, int32_t position) {
current_ = &ranges_[range];
current_->position = position;
currentIndex_ = range;
current_ = ranges_[range];
current_.position = position;
}

std::streampos tellp() const;
Expand All @@ -294,34 +297,50 @@ class ByteOutputStream {
template <typename T>
void append(folly::Range<const T*> values) {
static_assert(std::is_trivially_copyable_v<T>);
if (current_->position + sizeof(T) * values.size() > current_->size) {
if (current_.position + sizeof(T) * values.size() > current_.size) {
appendStringView(std::string_view(
reinterpret_cast<const char*>(&values[0]),
values.size() * sizeof(T)));
return;
}
auto* target = current_->buffer + current_->position;
auto* target = current_.buffer + current_.position;
memcpy(target, values.data(), values.size() * sizeof(T));
current_->position += sizeof(T) * values.size();
current_.position += sizeof(T) * values.size();
}

void appendBool(bool value, int32_t count);

/// Fast path used by appending one null in vector serialization.
template <bool kValue>
void appendOneBool() {
VELOX_DCHECK(isBits_);
if (FOLLY_UNLIKELY(current_.position >= current_.size)) {
extend(1);
}
auto* buffer = reinterpret_cast<uint64_t*>(current_.buffer);
if constexpr (kValue) {
bits::setBit(buffer, current_.position);
} else {
bits::clearBit(buffer, current_.position);
}
++current_.position;
}

// A fast path for appending bits into pre-cleared buffers after first extend.
inline void
appendBitsFresh(const uint64_t* bits, int32_t begin, int32_t end) {
const auto position = current_->position;
const auto position = current_.position;
if (begin == 0 && end <= 56) {
const auto available = current_->size - position;
const auto available = current_.size - position;
// There must be 8 bytes writable. If available is 56, there are 7, so >.
if (available > 56) {
const auto offset = position & 7;
const auto mask = bits::lowMask(offset);
auto* buffer = current_->buffer + (position >> 3);
auto* buffer = current_.buffer + (position >> 3);
auto value = folly::loadUnaligned<uint64_t>(buffer);
value = (value & mask) | (bits[0] << offset);
folly::storeUnaligned(buffer, value);
current_->position += end;
current_.position += end;
return;
}
}
Expand Down Expand Up @@ -364,13 +383,13 @@ class ByteOutputStream {
template <typename T>
uint8_t* getAppendWindow(int32_t size, ScratchPtr<T>& scratchPtr) {
const int32_t bytes = sizeof(T) * size;
if (!current_) {
if (!current_.buffer) {
extend(bytes);
}
auto available = current_->size - current_->position;
auto available = current_.size - current_.position;
if (available >= bytes) {
current_->position += bytes;
return current_->buffer + current_->position - bytes;
current_.position += bytes;
return current_.buffer + current_.position - bytes;
}
// If the tail is not large enough, make temp of the right size
// in scratch. Extend the stream so that there is guaranteed space to copy
Expand All @@ -390,9 +409,9 @@ class ByteOutputStream {
int32_t newRangeSize(int32_t bytes) const;

void updateEnd() const {
if (!ranges_.empty() && current_ == &ranges_.back() &&
current_->position > lastRangeEnd_) {
lastRangeEnd_ = current_->position;
if (!ranges_.empty() && currentIndex_ == ranges_.size() - 1 &&
current_.position > lastRangeEnd_) {
lastRangeEnd_ = current_.position;
}
}

Expand All @@ -411,8 +430,10 @@ class ByteOutputStream {
// The total number of bytes allocated from 'arena_' in 'ranges_'.
int64_t allocatedBytes_{0};

// Pointer to the current element of 'ranges_'.
ByteRange* current_{nullptr};
// Copy of the current element in 'ranges_'. This is copied to avoid memory
// thrashing when the number of streams are large.
ByteRange current_{};
int32_t currentIndex_{-1};

// Number of bits/bytes that have been written in the last element
// of 'ranges_'. In a write situation, all non-last ranges are full
Expand Down
Loading

0 comments on commit d4fbc7e

Please sign in to comment.