Skip to content

Commit

Permalink
Do not pre-allocate memory when init vector stream (#11211)
Browse files Browse the repository at this point in the history
Summary:
When creating and initializing vector streams we pre-allocate memory for all rows. This will cause unnecessary memory waste when the actual serialized row have many nulls. The null rows will not be serialized as payload, hence make use of the pre-allocated memory. Remove the pre-allocation to increase memory efficiency.
Memory usage of presto serializer decreased by up to 10x for some meta internal data.

Pull Request resolved: #11211

Reviewed By: xiaoxmeng

Differential Revision: D64149405

Pulled By: tanjialiang

fbshipit-source-id: 9eff63ba34c9ac51bd6ecce89c8e54efaa352f1e
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Oct 11, 2024
1 parent a976ba5 commit daeff59
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 10 deletions.
4 changes: 4 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ void ByteOutputStream::extend(int32_t bytes) {
ranges_.emplace_back();
current_ = &ranges_.back();
lastRangeEnd_ = 0;
if (bytes == 0) {
// Only initialize, do not allocate if bytes is 0.
return;
}
arena_->newRange(
newRangeSize(bytes),
ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2],
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ TEST_F(ByteStreamTest, newRangeAllocation) {
{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3}},
{{1023, 64, 64, kPageSize, 5 * kPageSize},
{1152, 1152, 1152, kPageSize + 1152, 7 * kPageSize},
{1024, 1536, 1536, kPageSize + 1536, 7 * kPageSize},
{kPageSize * 2,
kPageSize * 2,
kPageSize * 2,
Expand Down
21 changes: 12 additions & 9 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,8 @@ class VectorStream {
}

// The first element in the offsets in the wire format is always 0 for
// nested types.
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.startWrite(sizeof(vector_size_t));
lengths_.appendOne<int32_t>(0);
}
Expand Down Expand Up @@ -1720,7 +1721,9 @@ class VectorStream {
lengths_.startWrite(lengths_.size());
if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY ||
type_->kind() == TypeKind::MAP) {
// A complex type has a 0 as first length.
// The first element in the offsets in the wire format is always 0 for
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.appendOne<int32_t>(0);
}
}
Expand All @@ -1736,7 +1739,7 @@ class VectorStream {
std::optional<VectorPtr> vector,
vector_size_t initialNumRows) {
initializeHeader(typeToEncodingName(type_), *streamArena_);
nulls_.startWrite(1 + (initialNumRows / 8));
nulls_.startWrite(0);

switch (type_->kind()) {
case TypeKind::ROW:
Expand All @@ -1745,7 +1748,6 @@ class VectorStream {
[[fallthrough]];
case TypeKind::MAP:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
children_.resize(type_->size());
for (int32_t i = 0; i < type_->size(); ++i) {
children_[i] = std::make_unique<VectorStream>(
Expand All @@ -1757,21 +1759,23 @@ class VectorStream {
opts_);
}
// The first element in the offsets in the wire format is always 0 for
// nested types.
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.startWrite(sizeof(vector_size_t));
lengths_.appendOne<int32_t>(0);
break;
case TypeKind::VARCHAR:
[[fallthrough]];
case TypeKind::VARBINARY:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
lengths_.startWrite(0);
if (values_.ranges().empty()) {
values_.startWrite(initialNumRows * 10);
values_.startWrite(0);
}
break;
default:
if (values_.ranges().empty()) {
values_.startWrite(initialNumRows * 4);
values_.startWrite(0);
}
break;
}
Expand Down Expand Up @@ -1978,7 +1982,6 @@ void serializeRowVector(
VectorStream* stream,
Scratch& scratch) {
auto rowVector = vector->as<RowVector>();

std::vector<IndexRange> childRanges;
for (int32_t i = 0; i < ranges.size(); ++i) {
auto begin = ranges[i].begin;
Expand Down
27 changes: 27 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,33 @@ TEST_P(PrestoSerializerTest, emptyPage) {
assertEqualVectors(deserialized, rowVector);
}

TEST_P(PrestoSerializerTest, initMemory) {
const auto numRows = 100;
auto testFunc = [&](TypePtr type, int64_t expectedBytes) {
const auto poolMemUsage = pool_->usedBytes();
auto arena = std::make_unique<StreamArena>(pool_.get());
const auto paramOptions = getParamSerdeOptions(nullptr);
const auto rowType = ROW({type});
const auto serializer = serde_->createIterativeSerializer(
rowType, numRows, arena.get(), &paramOptions);
ASSERT_EQ(pool_->usedBytes() - poolMemUsage, expectedBytes);
};

testFunc(BOOLEAN(), 0);
testFunc(TINYINT(), 0);
testFunc(SMALLINT(), 0);
testFunc(INTEGER(), 0);
testFunc(BIGINT(), 0);
testFunc(REAL(), 0);
testFunc(DOUBLE(), 0);
testFunc(VARCHAR(), 0);
testFunc(TIMESTAMP(), 0);
// For nested types, 2 pages allocation quantum for first offset (0).
testFunc(ROW({VARCHAR()}), 8192);
testFunc(ARRAY(INTEGER()), 8192);
testFunc(MAP(VARCHAR(), INTEGER()), 8192);
}

TEST_P(PrestoSerializerTest, serializeNoRowsSelected) {
std::ostringstream out;
facebook::velox::serializer::presto::PrestoOutputStreamListener listener;
Expand Down

0 comments on commit daeff59

Please sign in to comment.