Skip to content

Commit

Permalink
Track number of rows processed in Window::getOutput (#6584)
Browse files Browse the repository at this point in the history
Summary:
Streaming window needs the number of rows processed in the getOutput() call for
the result vector. Not all received rows might be assigned to a partition yet.
In fact, the last partition is always open until noMoreInput is received. So
getOutput() can only process all closed partitions which is less than the
number of input rows.

Pull Request resolved: #6584

Reviewed By: amitkdutta

Differential Revision: D49330430

Pulled By: mbasmanova

fbshipit-source-id: 7b54b1c95fdeb496f734916d7f26cb14c095f71d
  • Loading branch information
aditi-pandit authored and facebook-github-bot committed Sep 16, 2023
1 parent 79fa20f commit ba274f1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
11 changes: 8 additions & 3 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ void Window::callApplyForPartitionRows(
partitionOffset_ += numRows;
}

void Window::callApplyLoop(
vector_size_t Window::callApplyLoop(
vector_size_t numOutputRows,
const RowVectorPtr& result) {
// Compute outputs by traversing as many partitions as possible. This
Expand Down Expand Up @@ -475,6 +475,9 @@ void Window::callApplyLoop(
break;
}
}

// Return the number of processed rows.
return numOutputRows - numOutputRowsLeft;
}

RowVectorPtr Window::getOutput() {
Expand Down Expand Up @@ -506,8 +509,10 @@ RowVectorPtr Window::getOutput() {
}

// Compute the output values of window functions.
callApplyLoop(numOutputRows, result);
return result;
auto numResultRows = callApplyLoop(numOutputRows, result);
return numResultRows < numOutputRows
? std::dynamic_pointer_cast<RowVector>(result->slice(0, numResultRows))
: result;
}

} // namespace facebook::velox::exec
5 changes: 4 additions & 1 deletion velox/exec/Window.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ class Window : public Operator {
// Computes the result vector for a single output block. The result
// consists of all the input columns followed by the results of the
// window function.
void callApplyLoop(vector_size_t numOutputRows, const RowVectorPtr& result);
// @return The number of rows processed in the loop.
vector_size_t callApplyLoop(
vector_size_t numOutputRows,
const RowVectorPtr& result);

// Converts WindowNode::Frame to Window::WindowFrame.
WindowFrame createWindowFrame(
Expand Down

0 comments on commit ba274f1

Please sign in to comment.