Skip to content

Commit

Permalink
Fix AssignUniqueId to avoid generating duplicate IDs
Browse files Browse the repository at this point in the history
Summary: Fix off-by-one bug in AssignUniqueId operator. It caused the operator to produce duplicate IDs if it ran out of IDs while processing input batch.

Differential Revision: D59866047
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Jul 17, 2024
1 parent 86efb63 commit 1e32d69
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 40 deletions.
21 changes: 15 additions & 6 deletions velox/exec/AssignUniqueId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ AssignUniqueId::AssignUniqueId(
planNode->id(),
"AssignUniqueId"),
rowIdPool_(std::move(rowIdPool)) {
VELOX_CHECK_LT(uniqueTaskId, kTaskUniqueIdLimit)
VELOX_USER_CHECK_LT(
uniqueTaskId,
kTaskUniqueIdLimit,
"Unique 24-bit ID specified for AssignUniqueId exceeds the limit")
uniqueValueMask_ = ((int64_t)uniqueTaskId) << 40;

const auto numColumns = planNode->outputType()->size();
Expand Down Expand Up @@ -88,13 +91,19 @@ void AssignUniqueId::generateIdColumn(vector_size_t size) {
requestRowIds();
}

auto batchSize =
std::min(maxRowIdCounterValue_ - rowIdCounter_ + 1, kRowIdsPerRequest);
auto end = (int32_t)std::min((int64_t)size, start + batchSize);
VELOX_CHECK_EQ((rowIdCounter_ + end - 1) & uniqueValueMask_, 0);
const auto numAvailableIds =
std::min(maxRowIdCounterValue_ - rowIdCounter_, kRowIdsPerRequest);
const auto end = (int32_t)std::min((int64_t)size, start + numAvailableIds);
VELOX_CHECK_EQ(
(rowIdCounter_ + (end - start)) & uniqueValueMask_,
0,
"Ran out of unique IDs at {}. Need {} more.",
rowIdCounter_,
(end - start));
std::iota(
rawResults + start, rawResults + end, uniqueValueMask_ | rowIdCounter_);
rowIdCounter_ += end;

rowIdCounter_ += (end - start);
start = end;
}
}
Expand Down
88 changes: 54 additions & 34 deletions velox/exec/tests/AssignUniqueIdTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/OperatorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
Expand All @@ -26,22 +28,20 @@ class AssignUniqueIdTest : public OperatorTestBase {
protected:
void verifyUniqueId(
const std::shared_ptr<const core::PlanNode>& plan,
const std::vector<RowVectorPtr>& inputRows,
const int numThreads = 1) {
const std::vector<RowVectorPtr>& input) {
CursorParameters params;
params.planNode = plan;
params.maxDrivers = numThreads;

auto result = readCursor(params, [](auto /*task*/) {});
auto numColumns = result.second[0]->childrenSize();
ASSERT_EQ(numColumns, inputRows[0]->childrenSize() + 1);
ASSERT_EQ(numColumns, input[0]->childrenSize() + 1);

std::set<int64_t> ids;
for (int i = 0; i < numColumns; i++) {
for (auto output : result.second) {
auto column = output->childAt(i);
for (auto batch = 0; batch < result.second.size(); ++batch) {
auto column = result.second[batch]->childAt(i);
if (i < numColumns - 1) {
assertEqualVectors(inputRows[0]->childAt(i), column);
assertEqualVectors(input[batch]->childAt(i), column);
} else {
auto idValues = column->asFlatVector<int64_t>()->rawValues();
std::copy(
Expand All @@ -51,12 +51,13 @@ class AssignUniqueIdTest : public OperatorTestBase {
}
}
}
vector_size_t totalInputSize = std::accumulate(
inputRows.begin(),
inputRows.end(),
0,
[](vector_size_t sum, RowVectorPtr row) { return sum + row->size(); });
ASSERT_EQ(totalInputSize * numThreads, ids.size());

vector_size_t totalInputSize = 0;
for (const auto& vector : input) {
totalInputSize += vector->size();
}

ASSERT_EQ(totalInputSize, ids.size());

auto task = result.first->task();

Expand All @@ -65,7 +66,7 @@ class AssignUniqueIdTest : public OperatorTestBase {
// ID vector. Memory should be allocated when producing first batch of
// output and re-used for subsequent batches.
auto stats = toPlanStats(task->taskStats());
ASSERT_EQ(numThreads, stats.at(uniqueNodeId_).numMemoryAllocations);
ASSERT_EQ(1, stats.at(uniqueNodeId_).numMemoryAllocations);
}

core::PlanNodeId uniqueNodeId_;
Expand All @@ -75,9 +76,8 @@ TEST_F(AssignUniqueIdTest, multiBatch) {
vector_size_t batchSize = 1000;
std::vector<RowVectorPtr> input;
for (int i = 0; i < 3; ++i) {
auto column =
makeFlatVector<int32_t>(batchSize, [](auto row) { return row; });
input.push_back(makeRowVector({column}));
input.push_back(
makeRowVector({makeFlatVector<int32_t>(batchSize, folly::identity)}));
}

auto plan = PlanBuilder()
Expand All @@ -91,8 +91,12 @@ TEST_F(AssignUniqueIdTest, multiBatch) {

TEST_F(AssignUniqueIdTest, exceedRequestLimit) {
vector_size_t requestLimit = 1 << 20L;
auto input = {makeRowVector({makeFlatVector<int32_t>(
requestLimit + 1, [](auto row) { return row; })})};
auto input = {
makeRowVector(
{makeFlatVector<int32_t>(requestLimit - 10, folly::identity)}),
makeRowVector({makeFlatVector<int32_t>(100, folly::identity)}),
makeRowVector({makeFlatVector<int32_t>(100, folly::identity)}),
};

auto plan = PlanBuilder()
.values(input)
Expand All @@ -106,42 +110,58 @@ TEST_F(AssignUniqueIdTest, exceedRequestLimit) {
TEST_F(AssignUniqueIdTest, multiThread) {
for (int i = 0; i < 3; i++) {
vector_size_t batchSize = 1000;
auto input = {makeRowVector(
{makeFlatVector<int32_t>(batchSize, [](auto row) { return row; })})};
auto input = {
makeRowVector({makeFlatVector<int32_t>(batchSize, folly::identity)})};
auto plan = PlanBuilder()
.values(input, true)
.assignUniqueId()
.capturePlanNodeId(uniqueNodeId_)
.planNode();

verifyUniqueId(plan, input, 8);
std::shared_ptr<exec::Task> task;
auto result =
AssertQueryBuilder(plan).maxDrivers(8).copyResults(pool(), task);
ASSERT_EQ(batchSize * 8, result->size());

std::set<int64_t> ids;
auto idValues =
result->children().back()->asFlatVector<int64_t>()->rawValues();
std::copy(
idValues, idValues + result->size(), std::inserter(ids, ids.end()));

ASSERT_EQ(batchSize * 8, ids.size());

// Verify number of memory allocations. There should be exactly one
// allocation (per thread of execution) for the values buffer of the unique
// ID vector. Memory should be allocated when producing first batch of
// output and re-used for subsequent batches.
auto stats = toPlanStats(task->taskStats());
ASSERT_EQ(8, stats.at(uniqueNodeId_).numMemoryAllocations);
}
}

TEST_F(AssignUniqueIdTest, maxRowIdLimit) {
auto input = {makeRowVector({makeFlatVector<int32_t>({1, 2, 3})})};

auto plan = PlanBuilder()
.values(input)
.assignUniqueId()
.capturePlanNodeId(uniqueNodeId_)
.planNode();
auto plan = PlanBuilder().values(input).assignUniqueId().planNode();

// Increase the counter to kMaxRowId.
std::dynamic_pointer_cast<const core::AssignUniqueIdNode>(plan)
->uniqueIdCounter()
->fetch_add(1L << 40);

EXPECT_THROW(verifyUniqueId(plan, input), VeloxRuntimeError);
VELOX_ASSERT_THROW(
AssertQueryBuilder(plan).copyResults(pool()),
"Ran out of unique IDs at 1099511627776");
}

TEST_F(AssignUniqueIdTest, taskUniqueIdLimit) {
auto input = {makeRowVector({makeFlatVector<int32_t>({1, 2, 3})})};

auto plan = PlanBuilder()
.values(input)
.assignUniqueId("unique", 1L << 24)
.capturePlanNodeId(uniqueNodeId_)
.planNode();
auto plan =
PlanBuilder().values(input).assignUniqueId("unique", 1L << 24).planNode();

EXPECT_THROW(verifyUniqueId(plan, input), VeloxRuntimeError);
VELOX_ASSERT_THROW(
AssertQueryBuilder(plan).copyResults(pool()),
"(16777216 vs. 16777216) Unique 24-bit ID specified for AssignUniqueId exceeds the limit");
}

0 comments on commit 1e32d69

Please sign in to comment.