Skip to content

Commit

Permalink
Merge pull request pixelsdb#8 from AntiO2/feature/implementCppVersion…
Browse files Browse the repository at this point in the history
…OfWriter

fix: Row Group Footer
  • Loading branch information
gengdy1545 authored Dec 16, 2024
2 parents e7854fb + fa83954 commit 792a429
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cpp/pixels-core/include/writer/IntegerColumnWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class IntegerColumnWriter : public ColumnWriter{
void newPixel() override;
void writeCurPartLong(std::shared_ptr<ColumnVector> columnVector, long* values, int curPartLength, int curPartOffset);
bool decideNullsPadding(std::shared_ptr<PixelsWriterOption> writerOption) override;
pixels::proto::ColumnEncoding getColumnChunkEncoding() const;
pixels::proto::ColumnEncoding getColumnChunkEncoding();
private:
bool isLong; //current column type is long or int, used for the first pixel
bool runlengthEncoding;
Expand Down
2 changes: 1 addition & 1 deletion cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ PixelsRecordReaderImpl::PixelsRecordReaderImpl(std::shared_ptr<PhysicalReader> r
includedColumnNum = 0;
endOfFile = false;
resultRowBatch = nullptr;

::DirectUringRandomAccessFile::Initialize();
checkBeforeRead();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/pixels-core/lib/writer/IntegerColumnWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void IntegerColumnWriter::newPixel()
ColumnWriter::newPixel();
}

pixels::proto::ColumnEncoding IntegerColumnWriter::getColumnChunkEncoding() const
pixels::proto::ColumnEncoding IntegerColumnWriter::getColumnChunkEncoding()
{
pixels::proto::ColumnEncoding columnEncoding;
if (runlengthEncoding)
Expand Down
3 changes: 2 additions & 1 deletion cpp/tests/writer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ target_link_libraries(
set(GTEST_DIR "${PROJECT_SOURCE_DIR}/third-party/googletest")
include_directories(${GTEST_DIR}/googletest/include)
include_directories(${PROJECT_SOURCE_DIR}/pixels-core/include)
include_directories(${PROJECT_SOURCE_DIR}/pixels-common/include)
include_directories(${PROJECT_SOURCE_DIR}/pixels-common/include)
include_directories(${CMAKE_CURRENT_BINARY_DIR}/../../pixels-common/liburing/src/include)
102 changes: 98 additions & 4 deletions cpp/tests/writer/PixelsWriterTest.cpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,68 @@
#include "PixelsWriterImpl.h"

#include "PixelsReaderImpl.h"
#include "physical/PhysicalReaderUtil.h"
#include "gtest/gtest.h"

class PIXELS_WRITER_TEST : public ::testing::Test
{
protected:
protected:
const int pixels_stride_ = 16;
const std::string target_file_path_{"test.dat"};
const std::string target_file_path_{"/home/anti/work/anti_pixels/cpp/build/debug/test.pxl"};
const int block_size_ = 1024;
const int compression_block_size_ = 16;
bool block_padding_ = true;
int row_num = 8888;
int row_num = 10;
const int row_group_size_ = 10;
};

TEST_F(PIXELS_WRITER_TEST, SINGLE_INT)
TEST_F(PIXELS_WRITER_TEST, DISABLED_SINGLE_INT)
{
auto schema = TypeDescription::fromString("struct<a:int>");
EXPECT_TRUE(schema);
std::vector<bool> encode_vector(1, true);
auto row_batch = schema->createRowBatch(row_group_size_, encode_vector);

EncodingLevel encoding_level{EncodingLevel::EL2};
bool nulls_padding = true;
bool partitioned = true;

auto pixels_writer = std::make_unique<PixelsWriterImpl>(schema, pixels_stride_, row_group_size_, target_file_path_,
block_size_, block_padding_, encoding_level, nulls_padding, partitioned, compression_block_size_);

/**=======================
* * INFO
* Write Row Batch
*
*========================**/
{
auto va = std::dynamic_pointer_cast<LongColumnVector>(row_batch->cols[0]);
ASSERT_TRUE(va);
auto start_time_ts = std::chrono::high_resolution_clock::now();
{
for (int i = 0; i < row_num; ++i)
{
auto row = row_batch->rowCount++;
va->add(i);
if (row_batch->rowCount == row_batch->getMaxSize())
{
pixels_writer->addRowBatch(row_batch);
row_batch->reset();
}
}
if(row_batch->rowCount!=0) {
pixels_writer->addRowBatch(row_batch);
row_batch->reset();
}
pixels_writer->close();
}
auto end_time_ts = std::chrono::high_resolution_clock::now();
auto duration = end_time_ts - start_time_ts;
std::cerr << "[DEBUG] Time: " << duration.count() << std::endl;
}
}

TEST_F(PIXELS_WRITER_TEST, WRITE_AND_READ)
{
auto schema = TypeDescription::fromString("struct<a:int>");
EXPECT_TRUE(schema);
Expand Down Expand Up @@ -59,4 +106,51 @@ TEST_F(PIXELS_WRITER_TEST, SINGLE_INT)
auto duration = end_time_ts - start_time_ts;
std::cerr << "[DEBUG] Time: " << duration.count() << std::endl;
}

{
auto storage = Storage::fromPath("file:/" + target_file_path_);
auto physical_reader = PhysicalReaderUtil::newPhysicalReader(storage, target_file_path_);

auto footer_cache = std::make_shared<PixelsFooterCache>();
auto file_tail = std::make_shared<pixels::proto::FileTail>();
{
// get file tail
long fileLen = physical_reader->getFileLength();
physical_reader->seek(fileLen - sizeof(long));
long fileTailOffset = physical_reader->readLong();
int fileTailLength = (int) (fileLen - fileTailOffset - sizeof(long));
physical_reader->seek(fileTailOffset);
auto fileTailBuffer = physical_reader->readFully(fileTailLength);
file_tail->ParseFromArray(fileTailBuffer->getPointer(), fileTailBuffer->size());
footer_cache->putFileTail(target_file_path_, file_tail);
}
// test read
auto pixels_reader = std::make_unique<PixelsReaderImpl>(
schema,
physical_reader,
file_tail,
footer_cache
);
std::cerr<< "[DEBUG] row group num:" << pixels_reader->getRowGroupNum() << std::endl;

PixelsReaderOption option;
option.setSkipCorruptRecords(false);
option.setTolerantSchemaEvolution(true);
option.setEnableEncodedColumnVector(true);
option.setIncludeCols({"a"});
option.setBatchSize(10);
option.setRGRange(0,1);
auto recordReader = pixels_reader->read(option);
auto rowBatch = recordReader->readBatch(true);
auto vector = std::static_pointer_cast<LongColumnVector>(rowBatch->cols[0]);
{
// check read result
for(int i = 0; i < row_num; i++) {
EXPECT_EQ(vector->intVector[i], i);
}
}
auto pause = true;
return;
}

}

0 comments on commit 792a429

Please sign in to comment.