Skip to content

Commit

Permalink
s
Browse files Browse the repository at this point in the history
  • Loading branch information
binwei committed Sep 28, 2024
1 parent 1109cb6 commit 2391005
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 55 deletions.
15 changes: 0 additions & 15 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,6 @@ OrderBy::OrderBy(
}

void OrderBy::addInput(RowVectorPtr input) {
if (first_) {
std::cerr << " xgbtck sort start input " << std::endl;
std::cerr << pool()->root()->treeMemoryUsage() << std::endl;
//je_gluten_malloc_stats_print(NULL, NULL, NULL);
first_ = false;
second_stage=true;
}

sortBuffer_->addInput(input);
}

Expand All @@ -93,8 +85,6 @@ void OrderBy::reclaim(
VELOX_CHECK(canReclaim());
VELOX_CHECK(!nonReclaimableSection_);

std::cerr << " xgbtck sort spill start researved_size = " << researved_size << std::endl;
std::cerr << this->pool()->root()->treeMemoryUsage() << std::endl;
//je_gluten_malloc_stats_print(NULL, NULL, NULL);
// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
Expand All @@ -105,14 +95,9 @@ void OrderBy::reclaim(
}

void OrderBy::noMoreInput() {
std::cerr << " xgbtck sort no more input from orderby" << std::endl;
std::cerr << pool()->root()->treeMemoryUsage() << std::endl;
Operator::noMoreInput();
sortBuffer_->noMoreInput();

std::cerr << " xgbtck sort no more input after sortbuffer" << std::endl;
std::cerr << pool()->root()->treeMemoryUsage() << std::endl;

first_=true;
maxOutputRows_ = outputBatchRows(sortBuffer_->estimateOutputRowSize());
//je_gluten_malloc_stats_print(NULL, NULL, NULL);
Expand Down
39 changes: 18 additions & 21 deletions velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,40 @@ RowsStreamingWindowBuild::RowsStreamingWindowBuild(
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection),
inputRows_(0, memory::StlAllocator<char*>(*pool)),
windowPartitions_(0, memory::StlAllocator<char*>(*pool)){
inputRows_(0, memory::StlAllocator<char*>(*pool)){
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild",
this);
// inversedInputChannels_ and sortKeyInfo_ are initialized in the base class.
// setup the fist windowpartition
windowPartitions_.emplace_back(std::make_shared<WindowPartition>(pool_, data_.get(), inversedInputChannels_, sortKeyInfo_));
}

void RowsStreamingWindowBuild::addPartitionInputs(bool finished) {
if (inputRows_.empty()) {
return;
}

if (windowPartitions_.size() <= inputPartition_) {
windowPartitions_.push_back(std::allocate_shared<WindowPartition>(
memory::StlAllocator<char*>(*pool_),
pool_, data_.get(), inversedInputChannels_, sortKeyInfo_));
}

windowPartitions_[inputPartition_]->addRows(inputRows_);
windowPartitions_.back()->addRows(inputRows_);

if (finished) {
windowPartitions_[inputPartition_]->setComplete();
windowPartitions_.back()->setComplete();
++inputPartition_;
// Create a new partition for the next input.
windowPartitions_.emplace_back(std::make_shared<WindowPartition>(pool_, data_.get(), inversedInputChannels_, sortKeyInfo_));
}

inputRows_.clear();
inputRows_.shrink_to_fit();
}
bool RowsStreamingWindowBuild::needsInput() {
// No partitions are available or the currentPartition is the last available
// one, so can consume input rows.
return windowPartitions_.empty() ||
outputPartition_ == inputPartition_;
}

void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
if (first_) {
std::cerr << "xgbtck rowstreamingwindow start input " << std::endl;
std::cerr << pool_->root()->treeMemoryUsage() << std::endl;
first_ = false;
//je_gluten_malloc_stats_print(NULL, NULL, NULL);
}

for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
Expand Down Expand Up @@ -91,23 +90,21 @@ void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
}

void RowsStreamingWindowBuild::noMoreInput() {
std::cerr << "xgbtck rowstreamingwindow no more input " << std::endl;
std::cerr << pool_->root()->treeMemoryUsage() << std::endl;
first_=true;
//je_gluten_malloc_stats_print(NULL, NULL, NULL);
addPartitionInputs(true);
}

std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() {
VELOX_CHECK(hasNextPartition());
outputPartition_++;
std::shared_ptr<WindowPartition> output = windowPartitions_[outputPartition_];
std::shared_ptr<WindowPartition> output = std::move(windowPartitions_.front());
windowPartitions_.pop_front();
return output;
}

bool RowsStreamingWindowBuild::hasNextPartition() {
return !windowPartitions_.empty() &&
outputPartition_ + 2 <= windowPartitions_.size();
outputPartition_ + 1 <= inputPartition_;
}

} // namespace facebook::velox::exec
11 changes: 3 additions & 8 deletions velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,7 @@ class RowsStreamingWindowBuild : public WindowBuild {

std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
// one, so can consume input rows.
return windowPartitions_.empty() ||
outputPartition_ == windowPartitions_.size() - 1;
}
bool needsInput() override;

private:
// Adds input rows to the current partition, or creates a new partition if it
Expand All @@ -70,13 +65,13 @@ class RowsStreamingWindowBuild : public WindowBuild {
char* previousRow_ = nullptr;

// Point to the current output partition if not -1.
vector_size_t outputPartition_ = -1;
vector_size_t outputPartition_ = 0;

// Current input partition that receives inputs.
vector_size_t inputPartition_ = 0;

// Holds all the built window partitions.
std::vector<std::shared_ptr<WindowPartition>, memory::StlAllocator<char*>> windowPartitions_;
std::deque<std::shared_ptr<WindowPartition>> windowPartitions_;

bool first_ = true;
};
Expand Down
11 changes: 0 additions & 11 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,9 @@ void SortBuffer::noMoreInput() {

//It may trigger spill, make sure it's triggered before noMoreInput_ is set
if (spiller_ == nullptr) {
std::cerr << "xgbtck sortbuffer no spill yet" << std::endl;
std::cerr << this->pool()->root()->treeMemoryUsage() << std::endl;

//force to trigger spill
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
pool_->maybeReserve(numInputRows_*sizeof(char*));

std::cerr << "xgbtck sortbuffer after sort buffer" << std::endl;
std::cerr << this->pool()->root()->treeMemoryUsage() << std::endl;
}

noMoreInput_ = true;
Expand Down Expand Up @@ -171,18 +165,13 @@ void SortBuffer::spill() {
if (data_->numRows() == 0) {
return;
}
std::cerr << " xgbtck sort spill start researved_size = " << std::endl;
std::cerr << this->pool()->root()->treeMemoryUsage() << std::endl;

updateEstimatedOutputRowSize();

if (sortedRows_.empty()) {
spillInput();
} else {
spillOutput();
}
std::cerr << " xgbtck spill finished researved_size = " << std::endl;
std::cerr << pool()->root()->treeMemoryUsage() << std::endl;
}

std::optional<uint64_t> SortBuffer::estimateOutputRowSize() const {
Expand Down

0 comments on commit 2391005

Please sign in to comment.