Skip to content

Commit

Permalink
[WIP] PO DEBUG
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 10, 2024
1 parent 9cf4ee0 commit 8773f2a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
3 changes: 3 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ void ByteOutputStream::extend(int32_t bytes) {
ranges_.emplace_back();
current_ = &ranges_.back();
lastRangeEnd_ = 0;
if (bytes == 0) {
return;
}
arena_->newRange(
newRangeSize(bytes),
ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2],
Expand Down
21 changes: 12 additions & 9 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,8 @@ class VectorStream {
}

// The first element in the offsets in the wire format is always 0 for
// nested types.
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.startWrite(sizeof(vector_size_t));
lengths_.appendOne<int32_t>(0);
}
Expand Down Expand Up @@ -1720,7 +1721,9 @@ class VectorStream {
lengths_.startWrite(lengths_.size());
if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY ||
type_->kind() == TypeKind::MAP) {
// A complex type has a 0 as first length.
// The first element in the offsets in the wire format is always 0 for
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.appendOne<int32_t>(0);
}
}
Expand All @@ -1736,7 +1739,7 @@ class VectorStream {
std::optional<VectorPtr> vector,
vector_size_t initialNumRows) {
initializeHeader(typeToEncodingName(type_), *streamArena_);
nulls_.startWrite(1 + (initialNumRows / 8));
nulls_.startWrite(0);

switch (type_->kind()) {
case TypeKind::ROW:
Expand All @@ -1745,7 +1748,6 @@ class VectorStream {
[[fallthrough]];
case TypeKind::MAP:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
children_.resize(type_->size());
for (int32_t i = 0; i < type_->size(); ++i) {
children_[i] = std::make_unique<VectorStream>(
Expand All @@ -1757,21 +1759,23 @@ class VectorStream {
opts_);
}
// The first element in the offsets in the wire format is always 0 for
// nested types.
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.startWrite(sizeof(vector_size_t));
lengths_.appendOne<int32_t>(0);
break;
case TypeKind::VARCHAR:
[[fallthrough]];
case TypeKind::VARBINARY:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
lengths_.startWrite(0);
if (values_.ranges().empty()) {
values_.startWrite(initialNumRows * 10);
values_.startWrite(0);
}
break;
default:
if (values_.ranges().empty()) {
values_.startWrite(initialNumRows * 4);
values_.startWrite(0);
}
break;
}
Expand Down Expand Up @@ -1978,7 +1982,6 @@ void serializeRowVector(
VectorStream* stream,
Scratch& scratch) {
auto rowVector = vector->as<RowVector>();

std::vector<IndexRange> childRanges;
for (int32_t i = 0; i < ranges.size(); ++i) {
auto begin = ranges[i].begin;
Expand Down
27 changes: 27 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,33 @@ TEST_P(PrestoSerializerTest, emptyPage) {
assertEqualVectors(deserialized, rowVector);
}

TEST_P(PrestoSerializerTest, initMemory) {
const auto numRows = 100;
auto testFunc = [&](TypePtr type, int64_t expectedBytes) {
const auto poolMemUsage = pool_->usedBytes();
auto arena = std::make_unique<StreamArena>(pool_.get());
const auto paramOptions = getParamSerdeOptions(nullptr);
const auto rowType = ROW({type});
const auto serializer = serde_->createIterativeSerializer(
rowType, numRows, arena.get(), &paramOptions);
ASSERT_EQ(pool_->usedBytes() - poolMemUsage, expectedBytes);
};

testFunc(BOOLEAN(), 0);
testFunc(TINYINT(), 0);
testFunc(SMALLINT(), 0);
testFunc(INTEGER(), 0);
testFunc(BIGINT(), 0);
testFunc(REAL(), 0);
testFunc(DOUBLE(), 0);
testFunc(VARCHAR(), 0);
testFunc(TIMESTAMP(), 0);
// For nested types, 2 pages allocation quantum for first offset (0).
testFunc(ROW({VARCHAR()}), 8192);
testFunc(ARRAY(INTEGER()), 8192);
testFunc(MAP(VARCHAR(), INTEGER()), 8192);
}

TEST_P(PrestoSerializerTest, serializeNoRowsSelected) {
std::ostringstream out;
facebook::velox::serializer::presto::PrestoOutputStreamListener listener;
Expand Down

0 comments on commit 8773f2a

Please sign in to comment.