diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index ae503a19237e..7ff3bd1d80d4 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -350,6 +350,9 @@ void ByteOutputStream::extend(int32_t bytes) { ranges_.emplace_back(); current_ = &ranges_.back(); lastRangeEnd_ = 0; + if (bytes == 0) { + return; + } arena_->newRange( newRangeSize(bytes), ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2], diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index ecbf1be4b911..26000c49e611 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1386,7 +1386,8 @@ class VectorStream { } // The first element in the offsets in the wire format is always 0 for - // nested types. + // nested types. Set upon construction/reset in case empty (no append + // calls will be made). lengths_.startWrite(sizeof(vector_size_t)); lengths_.appendOne(0); } @@ -1720,7 +1721,9 @@ class VectorStream { lengths_.startWrite(lengths_.size()); if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY || type_->kind() == TypeKind::MAP) { - // A complex type has a 0 as first length. + // The first element in the offsets in the wire format is always 0 for + // nested types. Set upon construction/reset in case empty (no append + // calls will be made). lengths_.appendOne(0); } } @@ -1736,7 +1739,7 @@ class VectorStream { std::optional vector, vector_size_t initialNumRows) { initializeHeader(typeToEncodingName(type_), *streamArena_); - nulls_.startWrite(1 + (initialNumRows / 8)); + nulls_.startWrite(0); switch (type_->kind()) { case TypeKind::ROW: @@ -1745,7 +1748,6 @@ class VectorStream { [[fallthrough]]; case TypeKind::MAP: hasLengths_ = true; - lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); children_.resize(type_->size()); for (int32_t i = 0; i < type_->size(); ++i) { children_[i] = std::make_unique( @@ -1757,21 +1759,23 @@ class VectorStream { opts_); } // The first element in the offsets in the wire format is always 0 for - // nested types. + // nested types. Set upon construction/reset in case empty (no append + // calls will be made). + lengths_.startWrite(sizeof(vector_size_t)); lengths_.appendOne(0); break; case TypeKind::VARCHAR: [[fallthrough]]; case TypeKind::VARBINARY: hasLengths_ = true; - lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); + lengths_.startWrite(0); if (values_.ranges().empty()) { - values_.startWrite(initialNumRows * 10); + values_.startWrite(0); } break; default: if (values_.ranges().empty()) { - values_.startWrite(initialNumRows * 4); + values_.startWrite(0); } break; } @@ -1978,7 +1982,6 @@ void serializeRowVector( VectorStream* stream, Scratch& scratch) { auto rowVector = vector->as(); - std::vector childRanges; for (int32_t i = 0; i < ranges.size(); ++i) { auto begin = ranges[i].begin; diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index d195ca5f79a2..da03e6f39739 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -820,6 +820,33 @@ TEST_P(PrestoSerializerTest, emptyPage) { assertEqualVectors(deserialized, rowVector); } +TEST_P(PrestoSerializerTest, initMemory) { + const auto numRows = 100; + auto testFunc = [&](TypePtr type, int64_t expectedBytes) { + const auto poolMemUsage = pool_->usedBytes(); + auto arena = std::make_unique(pool_.get()); + const auto paramOptions = getParamSerdeOptions(nullptr); + const auto rowType = ROW({type}); + const auto serializer = serde_->createIterativeSerializer( + rowType, numRows, arena.get(), ¶mOptions); + ASSERT_EQ(pool_->usedBytes() - poolMemUsage, expectedBytes); + }; + + testFunc(BOOLEAN(), 0); + testFunc(TINYINT(), 0); + testFunc(SMALLINT(), 0); + testFunc(INTEGER(), 0); + testFunc(BIGINT(), 0); + testFunc(REAL(), 0); + testFunc(DOUBLE(), 0); + testFunc(VARCHAR(), 0); + testFunc(TIMESTAMP(), 0); + // For nested types, 2 pages allocation quantum for first offset (0). + testFunc(ROW({VARCHAR()}), 8192); + testFunc(ARRAY(INTEGER()), 8192); + testFunc(MAP(VARCHAR(), INTEGER()), 8192); +} + TEST_P(PrestoSerializerTest, serializeNoRowsSelected) { std::ostringstream out; facebook::velox::serializer::presto::PrestoOutputStreamListener listener;