Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Nov 6, 2024
1 parent d821fb5 commit 833023a
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 42 deletions.
44 changes: 23 additions & 21 deletions velox/row/UnsafeRowDeserializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

namespace facebook::velox::row {
namespace {

inline int64_t getFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) {
return nullBitsetWidthInBytes + UnsafeRow::kFieldWidthBytes * index;
// Returns the offset of a column to starting memory address of one row.
// @param nullBitsetWidthInBytes The null-tracking bit set is aligned to 8-byte
// word boundaries. It stores one bit per field.
// @param columnIdx column index.
inline int64_t getFieldOffset(
int64_t nullBitsetWidthInBytes,
column_index_t columnIdx) {
return nullBitsetWidthInBytes + UnsafeRow::kFieldWidthBytes * columnIdx;
}

inline bool isNullAt(const uint8_t* memoryAddress, int32_t index) {
return bits::isBitSet(memoryAddress, index);
inline bool isNullAt(const uint8_t* memoryAddress, vector_size_t row) {
return bits::isBitSet(memoryAddress, row);
}

size_t getTotalStringSize(
Expand Down Expand Up @@ -165,13 +170,12 @@ VectorPtr createFlatVectorFast<TypeKind::VARCHAR>(
*(int64_t*)(memoryAddress + offsets[row] + fieldOffset);
const int32_t length = static_cast<int32_t>(offsetAndSize);
const int32_t wordOffset = static_cast<int32_t>(offsetAndSize >> 32);
auto valueSrcPtr = memoryAddress + offsets[row] + wordOffset;
auto* valueSrc = memoryAddress + offsets[row] + wordOffset;
if (StringView::isInline(length)) {
column->set(
row,
StringView(reinterpret_cast<const char*>(valueSrcPtr), length));
row, StringView(reinterpret_cast<const char*>(valueSrc), length));
} else {
memcpy(rawBuffer, valueSrcPtr, length);
memcpy(rawBuffer, valueSrc, length);
column->setNoCopy(row, StringView(rawBuffer, length));
rawBuffer += length;
}
Expand Down Expand Up @@ -227,11 +231,11 @@ VectorPtr deserializeFast(
const uint8_t* memoryAddress,
const RowTypePtr& type,
const std::vector<int64_t>& offsets,
vector_size_t numRows,
memory::MemoryPool* pool) {
const auto numFields = type->size();
const int64_t nullBitsetWidthInBytes = UnsafeRow::getNullLength(numFields);
std::vector<VectorPtr> columns(numFields);
const vector_size_t numRows = offsets.size();
for (auto i = 0; i < numFields; i++) {
const auto fieldOffset = getFieldOffset(nullBitsetWidthInBytes, i);
const auto& colType = type->childAt(i);
Expand Down Expand Up @@ -262,18 +266,16 @@ VectorPtr UnsafeRowDeserializer::deserialize(
const RowTypePtr& type,
const std::vector<int64_t>& offsets,
memory::MemoryPool* pool) {
const vector_size_t numRows = offsets.size() - 1;
if (fastSupported(type)) {
return deserializeFast(memoryAddress, type, offsets, pool);
} else {
std::vector<std::optional<std::string_view>> data;
const vector_size_t numRows = offsets.size();
for (auto i = 0; i < numRows; i++) {
const auto length =
(i == numRows - 1 ? offsets[i] : offsets[i + 1] - offsets[i]);
data.emplace_back(std::string_view(
reinterpret_cast<const char*>(memoryAddress + offsets[i]), length));
}
return deserialize(data, type, pool);
return deserializeFast(memoryAddress, type, offsets, numRows, pool);
}
std::vector<std::optional<std::string_view>> data;
for (auto i = 0; i < numRows; i++) {
const auto length = offsets[i + 1] - offsets[i];
data.emplace_back(std::string_view(
reinterpret_cast<const char*>(memoryAddress + offsets[i]), length));
}
return deserialize(data, type, pool);
}
} // namespace facebook::velox::row
48 changes: 29 additions & 19 deletions velox/row/UnsafeRowDeserializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,31 +553,41 @@ struct UnsafeRowPrimitiveBatchDeserializer {
*/
struct UnsafeRowDeserializer {
public:
/// Deserialize the rows which is serialized in contiguous memory.
/// Deserialize fast when all the column types are primitive type, otherwise,
/// deserialize rows one by one, as previously. Fast deserialization
/// computes the start memoryAddress of one data by the column index and row
/// number, and set all the data of one column at once.
/// @param memoryAddress the start memory address of the serialized rows .
/// @param type the element type.
/// @param offsets offset of each row serialized data. It's size should be
/// equal to the deserialized rows. First offset is 0.
/// @param pool the memory pool to allocate Vector.
/// Deserializes rows that are stored in contiguous memory.
/// If all column types are primitive, fast deserialization is used.
/// Otherwise, rows are deserialized one by one as before.
/// Fast deserialization calculates the starting memory address of data
/// based on the column index and row number, setting all data for one column
/// at once.
/// The null-tracking bit set is aligned to 8-byte word boundaries. It stores
/// one bit per field.
/// Each row has three parts: [null-tracking bit set] [values] [variable
/// length portion] In the `values` region, we store one 8-byte word per
/// field. For fields that hold fixed-length primitive types, such as long,
/// double, or int, we store the value directly in the word. For fields with
/// non-primitive or variable-length values, we store a relative offset
/// (w.r.t. the base address of the row) that points to the beginning of the
/// variable-length field, and length (they are combined into a long).
/// So we can get the starting memory address of each row by its offset, and
/// get the data of one column by field offset which is computed by fixed
/// width null bit set and fixed width column word.
/// @param memoryAddress The starting memory address of the serialized rows.
/// @param type The element type.
/// @param offsets Offset of each row serialized data. It's size should be
/// equal to the number of deserialized rows + 1. First offset is 0.
/// @param pool The memory pool used to allocate vector.
static VectorPtr deserialize(
const uint8_t* memoryAddress,
const RowTypePtr& type,
const std::vector<int64_t>& offsets,
memory::MemoryPool* pool);

/**
* Deserializes a complex element type to its Vector representation.
* @param data A vector of string_view over a given element in the
*UnsafeRow.
* @param type the element type.
* @param pool the memory pool to allocate Vectors from
*data to a array.
* @return a VectorPtr
*/
/// Deserializes a complex element type to its Vector representation.
/// @param data A vector of string_view over a given element in the
/// UnsafeRow.
/// @param type the element type.
/// @param pool the memory pool to allocate Vectors from data to a array.
/// @return a VectorPtr
static VectorPtr deserialize(
const std::vector<std::optional<std::string_view>>& data,
const TypePtr& type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ int deserializeFast(
offsets.push_back(offset);
offset += buffer->size();
}
offsets[nRows] = offset;
auto buffer = helper.copyBuffers(buffers, offset);
suspender.dismiss();
for (int i = 0; i < nIters; i++) {
Expand Down
5 changes: 3 additions & 2 deletions velox/row/tests/UnsafeRowFuzzTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class UnsafeRowFuzzTests : public ::testing::Test {

std::array<char[kBufferSize], kNumBuffers> buffers_{};
char buffer_[kBufferSize * kNumBuffers];
std::vector<size_t> offsets_;
std::vector<int64_t> offsets_;

std::shared_ptr<memory::MemoryPool> pool_ =
memory::memoryManager()->addLeafPool();
Expand Down Expand Up @@ -179,12 +179,13 @@ TEST_F(UnsafeRowFuzzTests, fast) {
UnsafeRowFast fast(data);
size_t offset = 0;
const auto numRows = data->size();
offsets_.resize(numRows);
offsets_.resize(numRows + 1);
for (auto i = 0; i < numRows; ++i) {
auto rowSize = fast.serialize(i, (char*)buffer_ + offset);
offsets_[i] = offset;
offset += rowSize;
}
offsets_[numRows] = offset;
VELOX_CHECK_LE(offset, kBufferSize * kNumBuffers);

// Deserialize previous bytes back to row vector.
Expand Down

0 comments on commit 833023a

Please sign in to comment.