diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp index b92caeedefc53..3e7bc38cb130b 100644 --- a/velox/expression/Expr.cpp +++ b/velox/expression/Expr.cpp @@ -507,6 +507,8 @@ void Expr::evalSimplifiedImpl( inputValue->encoding() == VectorEncoding::Simple::ROW || inputValue->encoding() == VectorEncoding::Simple::FUNCTION); }; + auto releaseInputsGuard = + folly::makeGuard([&]() { releaseInputValues(context); }); if (defaultNulls) { if (!evalArgsDefaultNulls(remainingRows, evalArg, context, result)) { @@ -530,7 +532,6 @@ void Expr::evalSimplifiedImpl( // Make sure the returned vector has its null bitmap properly set. addNulls(rows, remainingRows.rows().asRange().bits(), context, result); - releaseInputValues(context); } namespace { diff --git a/velox/expression/fuzzer/ExpressionFuzzerVerifier.cpp b/velox/expression/fuzzer/ExpressionFuzzerVerifier.cpp index 1ede9f9cccb96..9e71393a4fcc5 100644 --- a/velox/expression/fuzzer/ExpressionFuzzerVerifier.cpp +++ b/velox/expression/fuzzer/ExpressionFuzzerVerifier.cpp @@ -50,23 +50,6 @@ SelectivityVector extractNonNullRows(const RowVectorPtr& data) { } return nonNullRows; } - -/// Wraps child vectors of the specified 'rowVector' in dictionary using -/// specified 'indices'. Returns new RowVector created from the wrapped vectors. -RowVectorPtr wrapChildren( - const BufferPtr& indices, - const RowVectorPtr& rowVector) { - auto size = indices->size() / sizeof(vector_size_t); - - std::vector newInputs; - for (const auto& child : rowVector->children()) { - newInputs.push_back( - BaseVector::wrapInDictionary(nullptr, indices, size, child)); - } - - return std::make_shared( - rowVector->pool(), rowVector->type(), nullptr, size, newInputs); -} } // namespace ExpressionFuzzerVerifier::ExpressionFuzzerVerifier( @@ -115,39 +98,73 @@ ExpressionFuzzerVerifier::ExpressionFuzzerVerifier( } } -InputRowMetadata ExpressionFuzzerVerifier::generateInputRowMetadata( - const RowVectorPtr& rowVector, +std::pair, InputRowMetadata> +ExpressionFuzzerVerifier::generateInput( + const RowTypePtr& rowType, VectorFuzzer& vectorFuzzer) { - InputRowMetadata inputRowMetadata; - if (options_.commonDictionaryWrapRatio <= 0 && - options_.lazyVectorGenerationRatio <= 0) { - return inputRowMetadata; - } - - bool wrapInCommonDictionary = - vectorFuzzer.coinToss(options_.commonDictionaryWrapRatio); - for (int idx = 0; idx < rowVector->childrenSize(); idx++) { - const auto& child = rowVector->childAt(idx); - VELOX_CHECK_NOT_NULL(child); - if (child->encoding() != VectorEncoding::Simple::DICTIONARY && - wrapInCommonDictionary) { - inputRowMetadata.columnsToWrapInCommonDictionary.push_back(idx); + // Randomly pick to generate one or two input rows. + std::vector inputs; + int numInputs = vectorFuzzer.coinToss(0.5) ? 1 : 2; + // Generate the metadata for the input row. + InputRowMetadata metadata; + for (int idx = 0; idx < rowType->size(); ++idx) { + if (options_.commonDictionaryWrapRatio > 0 && + vectorFuzzer.coinToss(options_.commonDictionaryWrapRatio)) { + metadata.columnsToWrapInCommonDictionary.push_back(idx); } - if (vectorFuzzer.coinToss(options_.lazyVectorGenerationRatio)) { - inputRowMetadata.columnsToWrapInLazy.push_back( + if (options_.lazyVectorGenerationRatio > 0 && + vectorFuzzer.coinToss(options_.lazyVectorGenerationRatio)) { + metadata.columnsToWrapInLazy.push_back( vectorFuzzer.coinToss(0.8) ? idx : -1 * idx); } } - // Skip wrapping in common dictionary if there is only one column. - if (inputRowMetadata.columnsToWrapInCommonDictionary.size() > 1) { - auto inputSize = rowVector->size(); - inputRowMetadata.commonDictionaryIndices = - vectorFuzzer.fuzzIndices(inputSize, inputSize); - inputRowMetadata.commonDictionaryNulls = vectorFuzzer.fuzzNulls(inputSize); - } else { - inputRowMetadata.columnsToWrapInCommonDictionary.clear(); + // Generate the input row. + for (int inputIdx = 0; inputIdx < numInputs; ++inputIdx) { + std::vector children; + children.reserve(rowType->size() + 1); + for (auto i = 0; i < rowType->size(); ++i) { + if (std::binary_search( + metadata.columnsToWrapInCommonDictionary.begin(), + metadata.columnsToWrapInCommonDictionary.end(), + i)) { + // These will be wrapped in common dictionary later. + children.push_back(vectorFuzzer.fuzzFlat(rowType->childAt(i))); + } else { + children.push_back(vectorFuzzer.fuzz(rowType->childAt(i))); + } + } + + vector_size_t vecSize = vectorFuzzer.getOptions().vectorSize; + + // Modify the input row if needed based on the metadata. + if (metadata.columnsToWrapInCommonDictionary.size() < 2) { + // Avoid wrapping in common dictionary if there is only one column. + metadata.columnsToWrapInCommonDictionary.clear(); + } else { + auto commonIndices = vectorFuzzer.fuzzIndices(vecSize, vecSize); + auto commonNulls = vectorFuzzer.fuzzNulls(vecSize); + + for (auto colIdx : metadata.columnsToWrapInCommonDictionary) { + auto& child = children[colIdx]; + VELOX_CHECK_NOT_NULL(child); + child = BaseVector::wrapInDictionary( + commonNulls, commonIndices, vecSize, child); + } + } + // Append row number column to the input row. + auto names = rowType->names(); + names.push_back("row_number"); + + velox::test::VectorMaker vectorMaker{pool_.get()}; + children.push_back(vectorMaker.flatVector( + vecSize, [&](auto row) { return row; })); + + // Finally create the input row. + RowVectorPtr rowVector = vectorMaker.rowVector(names, children); + inputs.push_back({rowVector, SelectivityVector(vecSize)}); } - return inputRowMetadata; + // Return the input rows and the metadata. + return {inputs, metadata}; } void ExpressionFuzzerVerifier::reSeed() { @@ -250,7 +267,7 @@ RowVectorPtr ExpressionFuzzerVerifier::generateResultVectors( void ExpressionFuzzerVerifier::retryWithTry( std::vector plans, - const RowVectorPtr& rowVector, + std::vector inputsToRetry, const VectorPtr& resultVector, const InputRowMetadata& inputRowMetadata) { // Wrap each expression tree with 'try'. @@ -260,15 +277,14 @@ void ExpressionFuzzerVerifier::retryWithTry( plan->type(), std::vector{plan}, "try")); } - ResultOrError tryResult; + std::vector tryResults; // The function throws if anything goes wrong except // UNSUPPORTED_INPUT_UNCATCHABLE errors. try { - tryResult = verifier_.verify( + tryResults = verifier_.verify( tryPlans, - rowVector, - std::nullopt, + inputsToRetry, resultVector ? BaseVector::copy(*resultVector) : nullptr, false, // canThrow inputRowMetadata); @@ -278,31 +294,35 @@ void ExpressionFuzzerVerifier::retryWithTry( {&execCtx_, {false, ""}, referenceQueryRunner_}, *vectorFuzzer_, plans, - rowVector, - std::nullopt, + inputsToRetry, inputRowMetadata); } throw; } - if (tryResult.unsupportedInputUncatchableError) { - LOG(INFO) - << "Retry with try fails to find minimal subexpression due to UNSUPPORTED_INPUT_UNCATCHABLE error."; - return; - } - // Re-evaluate the original expression on rows that didn't produce an - // error (i.e. returned non-NULL results when evaluated with TRY). - SelectivityVector noErrorRows = extractNonNullRows(tryResult.result); + std::vector inputsToRetryWithoutErrors; + for (int i = 0; i < tryResults.size(); ++i) { + auto& tryResult = tryResults[i]; + if (tryResult.unsupportedInputUncatchableError) { + LOG(INFO) + << "Retry with try fails to find minimal subexpression due to UNSUPPORTED_INPUT_UNCATCHABLE error."; + return; + } + // Re-evaluate the original expression on rows that didn't produce an + // error (i.e. returned non-NULL results when evaluated with TRY). + inputsToRetry[i].activeRows = extractNonNullRows(tryResult.result); + if (inputsToRetry[i].activeRows.hasSelections()) { + inputsToRetryWithoutErrors.push_back(std::move(inputsToRetry[i])); + } + } - if (noErrorRows.hasSelections()) { - LOG(INFO) << "Retrying original expression on " << noErrorRows.end() - << " rows without errors"; + if (!inputsToRetryWithoutErrors.empty()) { + LOG(INFO) << "Retrying original expression on rows without errors"; try { verifier_.verify( plans, - rowVector, - noErrorRows, + inputsToRetryWithoutErrors, resultVector ? BaseVector::copy(*resultVector) : nullptr, false, // canThrow inputRowMetadata); @@ -312,8 +332,7 @@ void ExpressionFuzzerVerifier::retryWithTry( {&execCtx_, {false, ""}, referenceQueryRunner_}, *vectorFuzzer_, plans, - rowVector, - noErrorRows, + inputsToRetryWithoutErrors, inputRowMetadata); } throw; @@ -321,19 +340,6 @@ void ExpressionFuzzerVerifier::retryWithTry( } } -RowVectorPtr ExpressionFuzzerVerifier::appendRowNumberColumn( - RowVectorPtr& inputRow) { - auto names = asRowType(inputRow->type())->names(); - names.push_back("row_number"); - - auto& children = inputRow->children(); - velox::test::VectorMaker vectorMaker{pool_.get()}; - children.push_back(vectorMaker.flatVector( - inputRow->size(), [&](auto row) { return row; })); - - return vectorMaker.rowVector(names, children); -} - void ExpressionFuzzerVerifier::go() { VELOX_CHECK( options_.steps > 0 || options_.durationSeconds > 0, @@ -347,8 +353,8 @@ void ExpressionFuzzerVerifier::go() { size_t i = 0; size_t numFailed = 0; - // TODO: some expression will throw exception for NaN input, eg: IN predicate - // for floating point. remove this constraint once that are fixed + // TODO: some expression will throw exception for NaN input, eg: IN + // predicate for floating point. remove this constraint once that are fixed auto vectorOptions = vectorFuzzer_->getOptions(); vectorOptions.dataSpec = {false, false}; vectorFuzzer_->setOptions(vectorOptions); @@ -357,7 +363,8 @@ void ExpressionFuzzerVerifier::go() { << " (seed: " << currentSeed_ << ")"; // Generate multiple expression trees and input data vectors. They can - // re-use columns and share sub-expressions if the appropriate flag is set. + // re-use columns and share sub-expressions if the appropriate flag is + // set. int numExpressionTrees = boost::random::uniform_int_distribution( 1, options_.maxExpressionTreesPerStep)(rng_); auto [expressions, inputType, selectionStats] = @@ -373,19 +380,16 @@ void ExpressionFuzzerVerifier::go() { std::vector plans = std::move(expressions); - auto rowVector = vectorFuzzer_->fuzzInputRow(inputType); - InputRowMetadata inputRowMetadata = - generateInputRowMetadata(rowVector, *vectorFuzzer_); - rowVector = appendRowNumberColumn(rowVector); + auto [inputTestCases, inputRowMetadata] = + generateInput(inputType, *vectorFuzzer_); auto resultVectors = generateResultVectors(plans); - ResultOrError result; + std::vector results; try { - result = verifier_.verify( + results = verifier_.verify( plans, - rowVector, - std::nullopt, + inputTestCases, resultVectors ? BaseVector::copy(*resultVectors) : nullptr, true, // canThrow inputRowMetadata); @@ -395,26 +399,41 @@ void ExpressionFuzzerVerifier::go() { {&execCtx_, {false, ""}, referenceQueryRunner_}, *vectorFuzzer_, plans, - rowVector, - std::nullopt, + inputTestCases, inputRowMetadata); } throw; } - if (result.exceptionPtr) { + // If both paths threw compatible exceptions, we add a try() function to + // the expression's root and execute it again. This time the expressions + // cannot throw. Expressions that throw UNSUPPORTED_INPUT_UNCATCHABLE + // errors are not supported. + std::vector inputsToRetry; + bool anyInputsThrew = false; + bool anyInputsThrewButRetryable = false; + for (int j = 0; j < results.size(); j++) { + auto& result = results[j]; + if (result.exceptionPtr) { + anyInputsThrew = true; + if (!result.unsupportedInputUncatchableError && options_.retryWithTry) { + anyInputsThrewButRetryable = true; + inputsToRetry.push_back(inputTestCases[j]); + } + } else { + // If we re-try then also run these inputs to ensure the conditions + // during test run stay close to original, that is, multiple inputs are + // executed. + inputsToRetry.push_back(inputTestCases[j]); + } + } + if (anyInputsThrew) { ++numFailed; } - - // If both paths threw compatible exceptions, we add a try() function to - // the expression's root and execute it again. This time the expression - // cannot throw. Expressions that throw UNSUPPORTED_INPUT_UNCATCHABLE errors - // are not supported. - if (result.exceptionPtr && options_.retryWithTry && - !result.unsupportedInputUncatchableError) { + if (anyInputsThrewButRetryable) { LOG(INFO) << "Both paths failed with compatible exceptions. Retrying expression using try()."; - retryWithTry(plans, rowVector, resultVectors, inputRowMetadata); + retryWithTry(plans, inputsToRetry, resultVectors, inputRowMetadata); } LOG(INFO) << "==============================> Done with iteration " << i; diff --git a/velox/expression/fuzzer/ExpressionFuzzerVerifier.h b/velox/expression/fuzzer/ExpressionFuzzerVerifier.h index aaf6d7d5cdcea..ec9b4f2f27c1d 100644 --- a/velox/expression/fuzzer/ExpressionFuzzerVerifier.h +++ b/velox/expression/fuzzer/ExpressionFuzzerVerifier.h @@ -158,6 +158,23 @@ class ExpressionFuzzerVerifier { std::unordered_map& exprNameToStats_; }; + // Performs the following operations: + // 1. Randomly picks whether to generate either 1 or 2 input test cases. + // 2. Generates InputRowMetadata which contains the following: + // 2a. Randomly picked columns from the input row vector to wrap + // in lazy. Negative column indices represent lazy vectors that have been + // preloaded before feeding them to the evaluator. + // 2b. Randomly picked columns (2 or more) from the input row vector to + // wrap in a common dictionary layer. Only columns not already dictionary + // encoded are picked. + // Note: These lists are sorted on the absolute value of the entries. + // 3. Generates a Fuzzed input row vector and ensures that the columns picked + // in 2b are wrapped with the same dictionary. + // 4. Appends a row number column to the input row vector. + std::pair, InputRowMetadata> generateInput( + const RowTypePtr& rowType, + VectorFuzzer& vectorFuzzer); + /// Randomize initial result vector data to test for correct null and data /// setting in functions. RowVectorPtr generateResultVectors(std::vector& plans); @@ -171,7 +188,7 @@ class ExpressionFuzzerVerifier { /// Throws in case any of these steps fail. void retryWithTry( std::vector plans, - const RowVectorPtr& rowVector, + std::vector inputsToRetry, const VectorPtr& resultVectors, const InputRowMetadata& columnsToWrapInLazy); @@ -189,18 +206,6 @@ class ExpressionFuzzerVerifier { /// proportionOfTimesSelected numProcessedRows. void logStats(); - // Generates InputRowMetadata which contains the following: - // 1. Randomly picked columns from the input row vector to wrap - // in lazy. Negative column indices represent lazy vectors that have been - // preloaded before feeding them to the evaluator. - // 2. Randomly picked columns (2 or more) from the input row vector to - // wrap in a common dictionary layer. Only columns not already dictionary - // encoded are picked. - // Note: These lists are sorted on the absolute value of the entries. - InputRowMetadata generateInputRowMetadata( - const RowVectorPtr& rowVector, - VectorFuzzer& vectorFuzzer); - // Appends an additional row number column called 'row_number' at the end of // the 'inputRow'. This column is then used to line up rows when comparing // results against a reference database. diff --git a/velox/expression/fuzzer/FuzzerToolkit.cpp b/velox/expression/fuzzer/FuzzerToolkit.cpp index 41b261de7cfae..e57319c879545 100644 --- a/velox/expression/fuzzer/FuzzerToolkit.cpp +++ b/velox/expression/fuzzer/FuzzerToolkit.cpp @@ -19,13 +19,12 @@ namespace facebook::velox::fuzzer { namespace { -template -void saveStdVector(const std::vector& list, std::ostream& out) { +void saveStdVector(const std::vector& list, std::ostream& out) { // Size of the vector size_t size = list.size(); out.write((char*)&(size), sizeof(size)); out.write( - reinterpret_cast(list.data()), list.size() * sizeof(T)); + reinterpret_cast(list.data()), list.size() * sizeof(int)); } template @@ -158,43 +157,10 @@ void compareVectors( LOG(INFO) << "Two vectors match."; } -RowVectorPtr applyCommonDictionaryLayer( - const RowVectorPtr& rowVector, - const InputRowMetadata& inputRowMetadata) { - if (inputRowMetadata.columnsToWrapInCommonDictionary.empty()) { - return rowVector; - } - auto size = rowVector->size(); - auto& nulls = inputRowMetadata.commonDictionaryNulls; - auto& indices = inputRowMetadata.commonDictionaryIndices; - if (nulls) { - VELOX_CHECK_LE(bits::nbytes(size), nulls->size()); - } - VELOX_CHECK_LE(size, indices->size() / sizeof(vector_size_t)); - std::vector newInputs; - int listIndex = 0; - auto& columnsToWrap = inputRowMetadata.columnsToWrapInCommonDictionary; - for (int idx = 0; idx < rowVector->childrenSize(); idx++) { - auto& child = rowVector->childAt(idx); - VELOX_CHECK_NOT_NULL(child); - if (listIndex < columnsToWrap.size() && idx == columnsToWrap[listIndex]) { - newInputs.push_back( - BaseVector::wrapInDictionary(nulls, indices, size, child)); - listIndex++; - } else { - newInputs.push_back(child); - } - } - return std::make_shared( - rowVector->pool(), rowVector->type(), nullptr, size, newInputs); -} - void InputRowMetadata::saveToFile(const char* filePath) const { std::ofstream outputFile(filePath, std::ofstream::binary); saveStdVector(columnsToWrapInLazy, outputFile); saveStdVector(columnsToWrapInCommonDictionary, outputFile); - writeOptionalBuffer(commonDictionaryIndices, outputFile); - writeOptionalBuffer(commonDictionaryNulls, outputFile); outputFile.close(); } @@ -205,10 +171,8 @@ InputRowMetadata InputRowMetadata::restoreFromFile( std::ifstream in(filePath, std::ifstream::binary); ret.columnsToWrapInLazy = restoreStdVector(in); if (in.peek() != EOF) { - // this allows reading old files that only saved columnsToWrapInLazy. + // this check allows reading old files that only saved columnsToWrapInLazy. ret.columnsToWrapInCommonDictionary = restoreStdVector(in); - ret.commonDictionaryIndices = readOptionalBuffer(in, pool); - ret.commonDictionaryNulls = readOptionalBuffer(in, pool); } in.close(); return ret; diff --git a/velox/expression/fuzzer/FuzzerToolkit.h b/velox/expression/fuzzer/FuzzerToolkit.h index d9a7e7e8a2caa..e23f9783e9f53 100644 --- a/velox/expression/fuzzer/FuzzerToolkit.h +++ b/velox/expression/fuzzer/FuzzerToolkit.h @@ -43,6 +43,11 @@ struct SignatureTemplate { std::unordered_set typeVariables; }; +struct InputTestCase { + RowVectorPtr inputVector; + SelectivityVector activeRows; +}; + struct ResultOrError { RowVectorPtr result; std::exception_ptr exceptionPtr; @@ -123,11 +128,6 @@ struct InputRowMetadata { // increasing order) std::vector columnsToWrapInCommonDictionary; - // Dictionary indices and nulls for the common dictionary layer. Buffers are - // null if no columns are specified in `columnsToWrapInCommonDictionary`. - BufferPtr commonDictionaryIndices; - BufferPtr commonDictionaryNulls; - bool empty() const { return columnsToWrapInLazy.empty() && columnsToWrapInCommonDictionary.empty(); @@ -138,11 +138,4 @@ struct InputRowMetadata { const char* filePath, memory::MemoryPool* pool); }; - -// Wraps the columns in the row vector with a common dictionary layer. The -// column indices to wrap and the wrap itself is specified in -// `inputRowMetadata`. -RowVectorPtr applyCommonDictionaryLayer( - const RowVectorPtr& rowVector, - const InputRowMetadata& inputRowMetadata); } // namespace facebook::velox::fuzzer diff --git a/velox/expression/fuzzer/tests/FuzzerToolkitTest.cpp b/velox/expression/fuzzer/tests/FuzzerToolkitTest.cpp index 8dba6a0b1d87b..e453b6934b372 100644 --- a/velox/expression/fuzzer/tests/FuzzerToolkitTest.cpp +++ b/velox/expression/fuzzer/tests/FuzzerToolkitTest.cpp @@ -41,10 +41,7 @@ class FuzzerToolKitTest : public testing::Test, bool equals(const InputRowMetadata& lhs, const InputRowMetadata& rhs) { return lhs.columnsToWrapInLazy == rhs.columnsToWrapInLazy && lhs.columnsToWrapInCommonDictionary == - rhs.columnsToWrapInCommonDictionary && - compareBuffers( - lhs.commonDictionaryIndices, rhs.commonDictionaryIndices) && - compareBuffers(lhs.commonDictionaryNulls, rhs.commonDictionaryNulls); + rhs.columnsToWrapInCommonDictionary; } }; @@ -52,8 +49,6 @@ TEST_F(FuzzerToolKitTest, inputRowMetadataRoundTrip) { InputRowMetadata metadata; metadata.columnsToWrapInLazy = {1, -2, 3, -4, 5}; metadata.columnsToWrapInCommonDictionary = {1, 2, 3, 4, 5}; - metadata.commonDictionaryIndices = makeIndicesInReverse(5); - metadata.commonDictionaryNulls = makeNulls({true, false, true, false, true}); { auto path = exec::test::TempFilePath::create(); @@ -62,34 +57,5 @@ TEST_F(FuzzerToolKitTest, inputRowMetadataRoundTrip) { InputRowMetadata::restoreFromFile(path->getPath().c_str(), pool()); ASSERT_TRUE(equals(metadata, copy)); } - - metadata.commonDictionaryNulls = nullptr; - { - auto path = exec::test::TempFilePath::create(); - metadata.saveToFile(path->getPath().c_str()); - auto copy = - InputRowMetadata::restoreFromFile(path->getPath().c_str(), pool()); - ASSERT_TRUE(equals(metadata, copy)); - } - - metadata.columnsToWrapInCommonDictionary.clear(); - metadata.commonDictionaryIndices = nullptr; - { - auto path = exec::test::TempFilePath::create(); - metadata.saveToFile(path->getPath().c_str()); - auto copy = - InputRowMetadata::restoreFromFile(path->getPath().c_str(), pool()); - ASSERT_TRUE(equals(metadata, copy)); - } - - metadata.columnsToWrapInLazy.clear(); - metadata.commonDictionaryIndices = nullptr; - { - auto path = exec::test::TempFilePath::create(); - metadata.saveToFile(path->getPath().c_str()); - auto copy = - InputRowMetadata::restoreFromFile(path->getPath().c_str(), pool()); - ASSERT_TRUE(equals(metadata, copy)); - } } } // namespace facebook::velox::fuzzer::test diff --git a/velox/expression/tests/ExpressionRunner.cpp b/velox/expression/tests/ExpressionRunner.cpp index 7476415d506f1..d0f0645196444 100644 --- a/velox/expression/tests/ExpressionRunner.cpp +++ b/velox/expression/tests/ExpressionRunner.cpp @@ -72,14 +72,29 @@ RowVectorPtr evaluateAndPrintResults( return rowResult; } -vector_size_t adjustNumRows(vector_size_t numRows, vector_size_t size) { - return numRows > 0 && numRows < size ? numRows : size; +// Adjusts the number of rows to be evaluated to be at most numRows. +SelectivityVector adjustRows( + vector_size_t numRows, + const SelectivityVector& rows) { + if (numRows == 0 || numRows >= rows.countSelected()) { + return rows; + } + SelectivityVector adjustedRows(rows.end(), false); + for (int i = 0; i < rows.end() && numRows > 0; i++) { + if (rows.isValid(i)) { + adjustedRows.setValid(i, true); + numRows--; + } + } + return adjustedRows; } void saveResults( const RowVectorPtr& results, - const std::string& directoryPath) { - auto path = common::generateTempFilePath(directoryPath.c_str(), "vector"); + const std::string& directoryPath, + const std::string& fileName) { + auto path = + common::generateTempFilePath(directoryPath.c_str(), fileName.c_str()); VELOX_CHECK( path.has_value(), "Failed to create file for saving result vector in {} directory.", @@ -107,8 +122,71 @@ std::vector ExpressionRunner::parseSql( return typedExprs; } +// splits up strings into a vector of strings from a comma separated string +// e.g. "a,b,c" -> ["a", "b", "c"] +std::vector split(const std::string& s) { + std::vector result; + std::stringstream ss(s); + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + result.push_back(substr); + } + return result; +} + +// Ensures that all children of the input row vector indexed by 'indices' are +// wrapped in the same dictionary. To ensure peeling is employed, the BufferPtrs +// of the dictionary indices need to be the same. It is assumed that children +// indexed by 'indices' share the same dictionary indices but have separate +// BufferPtrs. +RowVectorPtr replicateCommonDictionaryLayer( + RowVectorPtr inputVector, + std::vector indices) { + if (inputVector == nullptr || indices.size() < 2) { + return inputVector; + } + std::vector children = inputVector->children(); + auto firstEncodedChild = children[indices[0]]; + VELOX_CHECK_EQ( + firstEncodedChild->encoding(), VectorEncoding::Simple::DICTIONARY); + auto commonDictionaryIndices = firstEncodedChild->wrapInfo(); + auto commonNulls = firstEncodedChild->nulls(); + for (auto i = 1; i < indices.size(); i++) { + auto& child = children[indices[i]]; + VELOX_CHECK_EQ(child->encoding(), VectorEncoding::Simple::DICTIONARY); + child = BaseVector::wrapInDictionary( + commonNulls, + commonDictionaryIndices, + child->size(), + child->valueVector()); + } + return std::make_shared( + inputVector->pool(), + inputVector->type(), + inputVector->nulls(), + inputVector->size(), + children); +} + +// Applies modifications to the input test cases based on the input row +// metadata, which includes making sure specific columns are wrapped in common +// dictionary and/or wrapped in a lazy shim layer. +void applyModificationsToInput( + std::vector& inputTestCases, + const InputRowMetadata& inputRowMetadata) { + for (auto& testCase : inputTestCases) { + auto& inputVector = testCase.inputVector; + inputVector = replicateCommonDictionaryLayer( + inputVector, inputRowMetadata.columnsToWrapInCommonDictionary); + inputVector = VectorFuzzer::fuzzRowChildrenToLazy( + inputVector, inputRowMetadata.columnsToWrapInLazy); + } +} + void ExpressionRunner::run( - const std::string& inputPath, + const std::string& inputPaths, + const std::string& inputSelectivityVectorPaths, const std::string& sql, const std::string& complexConstantsPath, const std::string& resultPath, @@ -129,47 +207,76 @@ void ExpressionRunner::run( : deserializerPool; core::ExecCtx execCtx{pool.get(), queryCtx.get()}; - RowVectorPtr inputVector; - - if (inputPath.empty()) { - inputVector = std::make_shared( - deserializerPool.get(), ROW({}), nullptr, 1, std::vector{}); - } else { - inputVector = std::dynamic_pointer_cast( - restoreVectorFromFile(inputPath.c_str(), deserializerPool.get())); - VELOX_CHECK_NOT_NULL( - inputVector, - "Input vector is not a RowVector: {}", - inputVector->toString()); - VELOX_CHECK_GT(inputVector->size(), 0, "Input vector must not be empty."); - } - fuzzer::InputRowMetadata inputRowMetadata; if (!inputRowMetadataPath.empty()) { inputRowMetadata = fuzzer::InputRowMetadata::restoreFromFile( inputRowMetadataPath.c_str(), pool.get()); } + std::vector inputTestCases; + if (inputPaths.empty()) { + inputTestCases.push_back( + {std::make_shared( + deserializerPool.get(), + ROW({}), + nullptr, + 1, + std::vector{}), + SelectivityVector(1)}); + } else { + std::vector inputPathsList = split(inputPaths); + std::vector inputSelectivityPaths = + split(inputSelectivityVectorPaths); + for (int i = 0; i < inputPathsList.size(); i++) { + auto inputVector = + std::dynamic_pointer_cast(restoreVectorFromFile( + inputPathsList[i].c_str(), deserializerPool.get())); + VELOX_CHECK_NOT_NULL( + inputVector, + "Input vector is not a RowVector: {}", + inputVector->toString()); + VELOX_CHECK_GT(inputVector->size(), 0, "Input vector must not be empty."); + if (inputSelectivityPaths.size() > i) { + inputTestCases.push_back( + {inputVector, + restoreSelectivityVectorFromFile( + inputSelectivityPaths[i].c_str())}); + } else { + inputTestCases.push_back( + {inputVector, SelectivityVector(inputVector->size(), true)}); + } + } + applyModificationsToInput(inputTestCases, inputRowMetadata); + } + + VELOX_CHECK(inputTestCases.size() > 0); + auto inputRowType = inputTestCases[0].inputVector->type(); + parse::registerTypeResolver(); if (mode == "query") { core::DuckDbQueryPlanner planner{pool.get()}; + for (int i = 0; i < inputTestCases.size(); ++i) { + auto& testCase = inputTestCases[i]; + if (inputRowType->size()) { + LOG(INFO) << "Registering input vector as table t: " + << inputRowType->toString(); + planner.registerTable("t", {testCase.inputVector}); + } - if (inputVector->type()->size()) { - LOG(INFO) << "Registering input vector as table t: " - << inputVector->type()->toString(); - planner.registerTable("t", {inputVector}); - } - - auto plan = planner.plan(sql); - auto results = exec::test::AssertQueryBuilder(plan).copyResults(pool.get()); + auto plan = planner.plan(sql); + auto results = + exec::test::AssertQueryBuilder(plan).copyResults(pool.get()); - // Print the results. - std::cout << "Result: " << results->type()->toString() << std::endl; - exec::test::printResults(results, std::cout); + // Print the results. + std::cout << "Result for Input " << i << ": " + << results->type()->toString() << std::endl; + exec::test::printResults(results, std::cout); - if (!storeResultPath.empty()) { - saveResults(results, storeResultPath); + if (!storeResultPath.empty()) { + auto fileName = fmt::format("resultVector_{}", i); + saveResults(results, storeResultPath, fileName); + } } return; } @@ -179,16 +286,13 @@ void ExpressionRunner::run( complexConstants = restoreVectorFromFile(complexConstantsPath.c_str(), pool.get()); } - auto typedExprs = - parseSql(sql, inputVector->type(), pool.get(), complexConstants); + auto typedExprs = parseSql(sql, inputRowType, pool.get(), complexConstants); VectorPtr resultVector; if (!resultPath.empty()) { resultVector = restoreVectorFromFile(resultPath.c_str(), pool.get()); } - SelectivityVector rows(adjustNumRows(numRows, inputVector->size())); - LOG(INFO) << "Evaluating SQL expression(s): " << sql; if (mode == "verify") { @@ -197,8 +301,7 @@ void ExpressionRunner::run( try { verifier.verify( typedExprs, - inputVector, - std::nullopt, + inputTestCases, std::move(resultVector), true, inputRowMetadata); @@ -210,27 +313,26 @@ void ExpressionRunner::run( std::move(verifier), fuzzer, typedExprs, - inputVector, - std::nullopt, + inputTestCases, inputRowMetadata); } throw; } - } else if (mode == "common") { - inputVector = VectorFuzzer::fuzzRowChildrenToLazy( - inputVector, inputRowMetadata.columnsToWrapInLazy); - inputVector = applyCommonDictionaryLayer(inputVector, inputRowMetadata); - exec::ExprSet exprSet(typedExprs, &execCtx); - auto results = evaluateAndPrintResults(exprSet, inputVector, rows, execCtx); - if (!storeResultPath.empty()) { - saveResults(results, storeResultPath); - } - } else if (mode == "simplified") { - exec::ExprSetSimplified exprSet(typedExprs, &execCtx); - auto results = evaluateAndPrintResults(exprSet, inputVector, rows, execCtx); - if (!storeResultPath.empty()) { - saveResults(results, storeResultPath); + } else if (mode == "common" || mode == "simplified") { + std::shared_ptr exprSet = mode == "common" + ? std::make_shared(typedExprs, &execCtx) + : std::make_shared(typedExprs, &execCtx); + for (int i = 0; i < inputTestCases.size(); ++i) { + auto& testCase = inputTestCases[i]; + SelectivityVector rows = adjustRows(numRows, testCase.activeRows); + std::cout << "Executing Input " << i << std::endl; + auto results = evaluateAndPrintResults( + *exprSet, testCase.inputVector, rows, execCtx); + if (!storeResultPath.empty()) { + auto fileName = fmt::format("resultVector_{}", i); + saveResults(results, storeResultPath, fileName); + } } } else { VELOX_FAIL("Unknown expression runner mode: [{}].", mode); diff --git a/velox/expression/tests/ExpressionRunner.h b/velox/expression/tests/ExpressionRunner.h index 76b8c9a3f108b..482ce8f43b496 100644 --- a/velox/expression/tests/ExpressionRunner.h +++ b/velox/expression/tests/ExpressionRunner.h @@ -31,8 +31,11 @@ namespace facebook::velox::test { /// supported) class ExpressionRunner { public: - /// @param inputPath The path to the on-disk vector that will be used as input - /// to feed to the expression. + /// @param inputPaths A comma separated list of paths to the on-disk vectors + /// that will be used as inputs to be fed to the expression. + /// @param inputSelectivityVectorPath A comma separated list of paths to the + /// on-disk selectivity vectors that correspond 1-to-1 with the inputs + /// to be fed to the expression. /// @param sql Comma-separated SQL expressions. /// @param complexConstantsPath The path to on-disk vector that stores complex /// subexpressions that aren't expressable in SQL (if any), used with @@ -63,7 +66,8 @@ class ExpressionRunner { /// User can refer to 'VectorSaver' class to see how to serialize/preserve /// vectors to disk. static void run( - const std::string& inputPath, + const std::string& inputPaths, + const std::string& inputSelectivityVectorPath, const std::string& sql, const std::string& complexConstantsPath, const std::string& resultPath, diff --git a/velox/expression/tests/ExpressionRunnerTest.cpp b/velox/expression/tests/ExpressionRunnerTest.cpp index c65c7a95daa05..0d926c9b1aefd 100644 --- a/velox/expression/tests/ExpressionRunnerTest.cpp +++ b/velox/expression/tests/ExpressionRunnerTest.cpp @@ -35,11 +35,19 @@ using facebook::velox::exec::test::PrestoQueryRunner; using facebook::velox::test::ReferenceQueryRunner; DEFINE_string( - input_path, + input_paths, "", - "Path for vector to be restored from disk. This will enable single run " - "of the fuzzer with the on-disk persisted repro information. This has to " - "be set with sql_path and optionally result_path."); + "Comma separated list of paths for vectors to be restored from disk. This " + "will enable single run of the fuzzer with the on-disk persisted repro " + "information. This has to be set with sql_path and optionally " + "result_path."); + +DEFINE_string( + input_selectivity_vector_paths, + "", + "Comma separated list of paths for selectivity vectors to be restored " + "from disk. The list needs to match 1-to-1 with the files specified in " + "input_paths. If not specified, all rows will be selected."); DEFINE_string( sql_path, @@ -189,14 +197,49 @@ static std::string checkAndReturnFilePath( return ""; } +static std::string getFilesWithPrefix( + const char* dirPath, + const std::string_view& prefix, + const std::string& flagName) { + std::vector filesPaths; + std::stringstream ss; + int numFilesFound = 0; + if (!std::filesystem::exists(dirPath)) { + LOG(ERROR) << "Directory does not exist: " << dirPath << std::endl; + return ""; + } + for (const auto& entry : std::filesystem::directory_iterator(dirPath)) { + if (entry.is_regular_file()) { + std::string filename = entry.path().filename(); + if (filename.find(prefix) == 0) { + if (++numFilesFound > 1) { + ss << ","; + } + ss << entry.path().string(); + } + } + } + LOG(INFO) << "Using " << flagName << " = " << ss.str(); + return ss.str(); +} + static void checkDirForExpectedFiles() { LOG(INFO) << "Searching input directory for expected files at " << FLAGS_fuzzer_repro_path; - FLAGS_input_path = FLAGS_input_path.empty() - ? checkAndReturnFilePath( - test::ExpressionVerifier::kInputVectorFileName, "input_path") - : FLAGS_input_path; + FLAGS_input_paths = FLAGS_input_paths.empty() + ? getFilesWithPrefix( + FLAGS_fuzzer_repro_path.c_str(), + test::ExpressionVerifier::kInputVectorFileNamePrefix, + "input_paths") + : FLAGS_input_paths; + FLAGS_input_selectivity_vector_paths = + FLAGS_input_selectivity_vector_paths.empty() + ? getFilesWithPrefix( + FLAGS_fuzzer_repro_path.c_str(), + test::ExpressionVerifier::kInputSelectivityVectorFileNamePrefix, + "input_selectivity_vector_paths") + : FLAGS_input_selectivity_vector_paths; FLAGS_result_path = FLAGS_result_path.empty() ? checkAndReturnFilePath( test::ExpressionVerifier::kResultVectorFileName, "result_path") @@ -260,7 +303,8 @@ int main(int argc, char** argv) { } test::ExpressionRunner::run( - FLAGS_input_path, + FLAGS_input_paths, + FLAGS_input_selectivity_vector_paths, sql, FLAGS_complex_constant_path, FLAGS_result_path, diff --git a/velox/expression/tests/ExpressionRunnerUnitTest.cpp b/velox/expression/tests/ExpressionRunnerUnitTest.cpp index 5bee29f8ac4c4..44f9af4dd885d 100644 --- a/velox/expression/tests/ExpressionRunnerUnitTest.cpp +++ b/velox/expression/tests/ExpressionRunnerUnitTest.cpp @@ -47,12 +47,11 @@ class ExpressionRunnerUnitTest : public testing::Test, public VectorTestBase { TEST_F(ExpressionRunnerUnitTest, run) { auto inputFile = exec::test::TempFilePath::create(); - auto sqlFile = exec::test::TempFilePath::create(); + auto selectivityVectorFile = exec::test::TempFilePath::create(); auto resultFile = exec::test::TempFilePath::create(); const auto inputPathStr = inputFile->getPath(); - const char* inputPath = inputPathStr.data(); const auto resultPathStr = resultFile->getPath(); - const char* resultPath = resultPathStr.data(); + const auto selectivityVectorPathStr = selectivityVectorFile->getPath(); const int vectorSize = 100; VectorMaker vectorMaker(pool_.get()); @@ -60,18 +59,23 @@ TEST_F(ExpressionRunnerUnitTest, run) { {"c0"}, {vectorMaker.flatVector(vectorSize, [](auto) { return "abc"; })}); - auto resultVector = vectorMaker.flatVector( - vectorSize, [](auto row) { return row * 100; }); - saveVectorToFile(inputVector.get(), inputPath); - saveVectorToFile(resultVector.get(), resultPath); + SelectivityVector rows(vectorSize); + auto resultVector = vectorMaker.rowVector( + {"output0"}, {vectorMaker.flatVector(vectorSize, [](auto row) { + return row * 100; + })}); + saveVectorToFile(inputVector.get(), inputPathStr.data()); + saveVectorToFile(resultVector.get(), resultPathStr.data()); + saveSelectivityVectorToFile(rows, selectivityVectorPathStr.data()); for (bool useSeperatePoolForInput : {true, false}) { LOG(INFO) << "Using useSeperatePoolForInput: " << useSeperatePoolForInput; EXPECT_NO_THROW(ExpressionRunner::run( - inputPath, + inputPathStr.data(), + selectivityVectorPathStr.data(), "length(c0)", "", - resultPath, + resultPathStr.data(), "verify", 0, "", diff --git a/velox/expression/tests/ExpressionVerifier.cpp b/velox/expression/tests/ExpressionVerifier.cpp index f1bc21773e632..0b9321bd4d6d6 100644 --- a/velox/expression/tests/ExpressionVerifier.cpp +++ b/velox/expression/tests/ExpressionVerifier.cpp @@ -28,19 +28,19 @@ namespace facebook::velox::test { using exec::test::ReferenceQueryErrorCode; namespace { -void logRowVector(const RowVectorPtr& rowVector) { - if (rowVector == nullptr) { - return; - } - VLOG(1) << rowVector->childrenSize() << " vectors as input:"; - for (const auto& child : rowVector->children()) { - VLOG(1) << "\t" << child->toString(/*recursive=*/true); - } +void logInputs(const std::vector& inputTestCases) { + for (const auto& [rowVector, rows] : inputTestCases) { + VLOG(1) << rowVector->childrenSize() << " vectors as input:"; + for (const auto& child : rowVector->children()) { + VLOG(1) << "\t" << child->toString(/*recursive=*/true); + } - VLOG(1) << "RowVector contents (" << rowVector->type()->toString() << "):"; + VLOG(1) << "RowVector contents (" << rowVector->type()->toString() << "):"; - for (vector_size_t i = 0; i < rowVector->size(); ++i) { - VLOG(1) << "\tAt " << i << ": " << rowVector->toString(i); + for (vector_size_t i = 0; i < rowVector->size(); ++i) { + VLOG(1) << "\tAt " << i << ": " << rowVector->toString(i); + } + VLOG(1) << "Rows to verify: " << rows.toString(rows.end()); } } @@ -93,17 +93,16 @@ RowVectorPtr reduceToSelectedRows( } } // namespace -fuzzer::ResultOrError ExpressionVerifier::verify( +std::vector ExpressionVerifier::verify( const std::vector& plans, - const RowVectorPtr& rowVector, - const std::optional& rowsToVerify, + const std::vector& inputTestCases, VectorPtr&& resultVector, bool canThrow, const InputRowMetadata& inputRowMetadata) { for (int i = 0; i < plans.size(); ++i) { LOG(INFO) << "Executing expression " << i << " : " << plans[i]->toString(); } - logRowVector(rowVector); + logInputs(inputTestCases); // Store data and expression in case of reproduction. VectorPtr copiedResult; @@ -131,278 +130,311 @@ fuzzer::ResultOrError ExpressionVerifier::verify( } if (options_.persistAndRunOnce) { persistReproInfo( - rowVector, inputRowMetadata, copiedResult, sql, complexConstants); + inputTestCases, + inputRowMetadata, + copiedResult, + sql, + complexConstants); } } - // Execute expression plan using both common and simplified evals. - std::vector commonEvalResult; - std::vector simplifiedEvalResult; - if (resultVector && resultVector->encoding() == VectorEncoding::Simple::ROW) { - auto resultRowVector = resultVector->asUnchecked(); - auto children = resultRowVector->children(); - commonEvalResult.resize(children.size()); - simplifiedEvalResult.resize(children.size()); - for (int i = 0; i < children.size(); ++i) { - commonEvalResult[i] = children[i]; - } - } else { - // For backwards compatibility where there was a single result and plan. - VELOX_CHECK_EQ(plans.size(), 1); - commonEvalResult.push_back(resultVector); - simplifiedEvalResult.resize(1); - } - std::exception_ptr exceptionCommonPtr; - std::exception_ptr exceptionSimplifiedPtr; - - VLOG(1) << "Starting common eval execution."; - SelectivityVector rows; - if (rowsToVerify.has_value()) { - rows = *rowsToVerify; - } else { - rows = SelectivityVector{rowVector ? rowVector->size() : 1}; - } + std::vector results; + // Share ExpressionSet between consecutive iterations to simulate its usage in + // FilterProject. + exec::ExprSet exprSetCommon( + plans, execCtx_, !options_.disableConstantFolding); + exec::ExprSetSimplified exprSetSimplified(plans, execCtx_); - // Execute with common expression eval path. Some columns of the input row - // vector will be wrapped in lazy as specified in 'columnsToWrapInLazy'. - - // Whether UNSUPPORTED_INPUT_UNCATCHABLE error is thrown from either the - // common or simplified evaluation path. This error is allowed to thrown only - // from one evaluation path because it is VeloxRuntimeError, hence cannot be - // suppressed by default nulls. - bool unsupportedInputUncatchableError{false}; - // Whether default null behavior takes place in the common evaluation path. If - // so, errors from Presto are allowed because Presto doesn't suppress error by - // default nulls. - bool defaultNull{false}; - try { - exec::ExprSet exprSetCommon( - plans, execCtx_, !options_.disableConstantFolding); - auto inputRowVector = rowVector; - VectorPtr copiedInput; - inputRowVector = VectorFuzzer::fuzzRowChildrenToLazy( - rowVector, inputRowMetadata.columnsToWrapInLazy); - inputRowVector = - applyCommonDictionaryLayer(inputRowVector, inputRowMetadata); - if (inputRowVector != rowVector) { - VLOG(1) << "Modified inputs for common eval path: "; - logRowVector(inputRowVector); - } - if (inputRowMetadata.columnsToWrapInLazy.empty()) { - // Copy loads lazy vectors so only do this when there are no lazy inputs. - copiedInput = BaseVector::copy(*inputRowVector); + int testCaseItr = 0; + for (auto [rowVector, rows] : inputTestCases) { + VLOG(1) << "Executing test case: " << testCaseItr++; + // Execute expression plan using both common and simplified evals. + std::vector commonEvalResult; + std::vector simplifiedEvalResult; + if (resultVector) { + VELOX_CHECK(resultVector->encoding() == VectorEncoding::Simple::ROW); + auto resultRowVector = resultVector->asUnchecked(); + auto children = resultRowVector->children(); + commonEvalResult.resize(children.size()); + simplifiedEvalResult.resize(children.size()); + for (int i = 0; i < children.size(); ++i) { + commonEvalResult[i] = children[i]; + } } + std::exception_ptr exceptionCommonPtr; + std::exception_ptr exceptionSimplifiedPtr; + + VLOG(1) << "Starting common eval execution."; + + // Execute with common expression eval path. Some columns of the input row + // vector will be wrapped in lazy as specified in 'columnsToWrapInLazy'. + + // Whether UNSUPPORTED_INPUT_UNCATCHABLE error is thrown from either the + // common or simplified evaluation path. This error is allowed to thrown + // only from one evaluation path because it is VeloxRuntimeError, hence + // cannot be suppressed by default nulls. + bool unsupportedInputUncatchableError{false}; + // Whether default null behavior takes place in the common evaluation path. + // If so, errors from Presto are allowed because Presto doesn't suppress + // error by default nulls. + bool defaultNull{false}; + try { + auto inputRowVector = rowVector; + VectorPtr copiedInput; + inputRowVector = VectorFuzzer::fuzzRowChildrenToLazy( + rowVector, inputRowMetadata.columnsToWrapInLazy); + if (inputRowMetadata.columnsToWrapInLazy.empty()) { + // Copy loads lazy vectors so only do this when there are no lazy + // inputs. + copiedInput = BaseVector::copy(*inputRowVector); + } - exec::EvalCtx evalCtxCommon(execCtx_, &exprSetCommon, inputRowVector.get()); - exprSetCommon.eval(rows, evalCtxCommon, commonEvalResult); - defaultNull = defaultNullRowsSkipped(exprSetCommon); - - if (copiedInput) { - // Flatten the input vector as an optimization if its very deeply nested. - fuzzer::compareVectors( - copiedInput, - BaseVector::copy(*inputRowVector), - "Copy of original input", - "Input after common", - rows); - } - } catch (const VeloxException& e) { - if (e.errorCode() == error_code::kUnsupportedInputUncatchable) { - unsupportedInputUncatchableError = true; - } else if (!(canThrow && e.isUserError())) { - if (!canThrow) { - LOG(ERROR) - << "Common eval wasn't supposed to throw, but it did. Aborting."; - } else if (!e.isUserError()) { - LOG(ERROR) - << "Common eval: VeloxRuntimeErrors other than UNSUPPORTED_INPUT_UNCATCHABLE error are not allowed."; + exec::EvalCtx evalCtxCommon( + execCtx_, &exprSetCommon, inputRowVector.get()); + exprSetCommon.eval( + 0, + exprSetCommon.size(), + true /*initialize*/, + rows, + evalCtxCommon, + commonEvalResult); + defaultNull = defaultNullRowsSkipped(exprSetCommon); + + if (copiedInput) { + // Flatten the input vector as an optimization if its very deeply + // nested. + fuzzer::compareVectors( + copiedInput, + BaseVector::copy(*inputRowVector), + "Copy of original input", + "Input after common", + rows); } + } catch (const VeloxException& e) { + if (e.errorCode() == error_code::kUnsupportedInputUncatchable) { + unsupportedInputUncatchableError = true; + } else if (!(canThrow && e.isUserError())) { + if (!canThrow) { + LOG(ERROR) + << "Common eval wasn't supposed to throw, but it did. Aborting."; + } else if (!e.isUserError()) { + LOG(ERROR) + << "Common eval: VeloxRuntimeErrors other than UNSUPPORTED_INPUT_UNCATCHABLE error are not allowed."; + } + persistReproInfoIfNeeded( + inputTestCases, + inputRowMetadata, + copiedResult, + sql, + complexConstants); + throw; + } + exceptionCommonPtr = std::current_exception(); + } catch (...) { + LOG(ERROR) + << "Common eval: Exceptions other than VeloxUserError or VeloxRuntimeError of UNSUPPORTED_INPUT_UNCATCHABLE are not allowed."; persistReproInfoIfNeeded( - rowVector, inputRowMetadata, copiedResult, sql, complexConstants); + inputTestCases, + inputRowMetadata, + copiedResult, + sql, + complexConstants); throw; } - exceptionCommonPtr = std::current_exception(); - } catch (...) { - LOG(ERROR) - << "Common eval: Exceptions other than VeloxUserError or VeloxRuntimeError of UNSUPPORTED_INPUT_UNCATCHABLE are not allowed."; - persistReproInfoIfNeeded( - rowVector, inputRowMetadata, copiedResult, sql, complexConstants); - throw; - } - - VLOG(1) << "Starting reference eval execution."; - if (referenceQueryRunner_ != nullptr) { - VLOG(1) << "Execute with reference DB."; - auto inputRowVector = rowVector; - inputRowVector = - applyCommonDictionaryLayer(inputRowVector, inputRowMetadata); - inputRowVector = reduceToSelectedRows(rowVector, rows); - auto projectionPlan = makeProjectionPlan(inputRowVector, plans); - auto referenceResultOrError = computeReferenceResults( - projectionPlan, {inputRowVector}, referenceQueryRunner_.get()); - - auto referenceEvalResult = referenceResultOrError.first; + VLOG(1) << "Starting reference eval execution."; + + if (referenceQueryRunner_ != nullptr) { + VLOG(1) << "Execute with reference DB."; + auto inputRowVector = reduceToSelectedRows(rowVector, rows); + auto projectionPlan = makeProjectionPlan(inputRowVector, plans); + auto referenceResultOrError = computeReferenceResults( + projectionPlan, {inputRowVector}, referenceQueryRunner_.get()); + + auto referenceEvalResult = referenceResultOrError.first; + + if (referenceResultOrError.second != + ReferenceQueryErrorCode::kReferenceQueryUnsupported) { + bool exceptionReference = + (referenceResultOrError.second != + ReferenceQueryErrorCode::kSuccess); + try { + // Compare results or exceptions (if any). Fail if anything is + // different. + if (exceptionCommonPtr || exceptionReference) { + // Throws in case only one evaluation path throws exception. + // Otherwise, return false to signal that the expression failed. + if (!(defaultNull && + referenceQueryRunner_->runnerType() == + ReferenceQueryRunner::RunnerType::kPrestoQueryRunner) && + !(exceptionCommonPtr && exceptionReference)) { + LOG(ERROR) << "Only " + << (exceptionCommonPtr ? "common" : "reference") + << " path threw exception:"; + if (exceptionCommonPtr) { + std::rethrow_exception(exceptionCommonPtr); + } else { + auto referenceSql = + referenceQueryRunner_->toSql(projectionPlan); + VELOX_FAIL( + "Reference path throws for query: {}", *referenceSql); + } + } + } else { + // Throws in case output is different. + VELOX_CHECK_EQ(commonEvalResult.size(), plans.size()); + VELOX_CHECK(referenceEvalResult.has_value()); + + std::vector types; + for (auto i = 0; i < commonEvalResult.size(); ++i) { + types.push_back(commonEvalResult[i]->type()); + } + auto commonEvalResultRow = std::make_shared( + execCtx_->pool(), + ROW(std::move(types)), + nullptr, + commonEvalResult[0]->size(), + commonEvalResult); + commonEvalResultRow = + reduceToSelectedRows(commonEvalResultRow, rows); + VELOX_CHECK( + exec::test::assertEqualResults( + referenceEvalResult.value(), + projectionPlan->outputType(), + {commonEvalResultRow}), + "Velox and reference DB results don't match"); + LOG(INFO) << "Verified results against reference DB"; + } + } catch (...) { + persistReproInfoIfNeeded( + inputTestCases, + inputRowMetadata, + copiedResult, + sql, + complexConstants); + throw; + } + } + } else { + VLOG(1) << "Execute with simplified expression eval path."; + try { + exec::EvalCtx evalCtxSimplified( + execCtx_, &exprSetSimplified, rowVector.get()); + + auto copy = BaseVector::copy(*rowVector); + exprSetSimplified.eval( + 0, + exprSetSimplified.size(), + true /*initialize*/, + rows, + evalCtxSimplified, + simplifiedEvalResult); + + // Flatten the input vector as an optimization if its very deeply + // nested. + fuzzer::compareVectors( + copy, + BaseVector::copy(*rowVector), + "Copy of original input", + "Input after simplified", + rows); + } catch (const VeloxException& e) { + if (e.errorCode() == error_code::kUnsupportedInputUncatchable) { + unsupportedInputUncatchableError = true; + } else if (!e.isUserError()) { + LOG(ERROR) + << "Simplified eval: VeloxRuntimeErrors other than UNSUPPORTED_INPUT_UNCATCHABLE error are not allowed."; + persistReproInfoIfNeeded( + inputTestCases, + inputRowMetadata, + copiedResult, + sql, + complexConstants); + throw; + } + exceptionSimplifiedPtr = std::current_exception(); + } catch (...) { + LOG(ERROR) + << "Simplified eval: Exceptions other than VeloxUserError or VeloxRuntimeError with UNSUPPORTED_INPUT are not allowed."; + persistReproInfoIfNeeded( + inputTestCases, + inputRowMetadata, + copiedResult, + sql, + complexConstants); + throw; + } - if (referenceResultOrError.second != - ReferenceQueryErrorCode::kReferenceQueryUnsupported) { - bool exceptionReference = - (referenceResultOrError.second != ReferenceQueryErrorCode::kSuccess); try { // Compare results or exceptions (if any). Fail if anything is // different. - if (exceptionCommonPtr || exceptionReference) { - // Throws in case only one evaluation path throws exception. - // Otherwise, return false to signal that the expression failed. - if (!(defaultNull && - referenceQueryRunner_->runnerType() == - ReferenceQueryRunner::RunnerType::kPrestoQueryRunner) && - !(exceptionCommonPtr && exceptionReference)) { - LOG(ERROR) << "Only " - << (exceptionCommonPtr ? "common" : "reference") - << " path threw exception:"; - if (exceptionCommonPtr) { - std::rethrow_exception(exceptionCommonPtr); - } else { - auto referenceSql = referenceQueryRunner_->toSql(projectionPlan); - VELOX_FAIL("Reference path throws for query: {}", *referenceSql); - } + if (exceptionCommonPtr || exceptionSimplifiedPtr) { + // UNSUPPORTED_INPUT_UNCATCHABLE errors are VeloxRuntimeErrors that + // cannot + // be suppressed by default NULLs. So it may happen that only one of + // the common and simplified path throws this error. In this case, we + // do not compare the exceptions. + if (!unsupportedInputUncatchableError) { + // Throws in case exceptions are not compatible. If they are + // compatible, return false to signal that the expression failed. + fuzzer::compareExceptions( + exceptionCommonPtr, exceptionSimplifiedPtr); } + results.push_back( + {nullptr, + exceptionCommonPtr ? exceptionCommonPtr : exceptionSimplifiedPtr, + unsupportedInputUncatchableError}); + continue; } else { // Throws in case output is different. VELOX_CHECK_EQ(commonEvalResult.size(), plans.size()); - VELOX_CHECK(referenceEvalResult.has_value()); - - std::vector types; - for (auto i = 0; i < commonEvalResult.size(); ++i) { - types.push_back(commonEvalResult[i]->type()); + VELOX_CHECK_EQ(simplifiedEvalResult.size(), plans.size()); + for (int i = 0; i < plans.size(); ++i) { + fuzzer::compareVectors( + commonEvalResult[i], + simplifiedEvalResult[i], + "common path results ", + "simplified path results", + rows); } - auto commonEvalResultRow = std::make_shared( - execCtx_->pool(), - ROW(std::move(types)), - nullptr, - commonEvalResult[0]->size(), - commonEvalResult); - commonEvalResultRow = reduceToSelectedRows(commonEvalResultRow, rows); - VELOX_CHECK( - exec::test::assertEqualResults( - referenceEvalResult.value(), - projectionPlan->outputType(), - {commonEvalResultRow}), - "Velox and reference DB results don't match"); - LOG(INFO) << "Verified results against reference DB"; } } catch (...) { persistReproInfoIfNeeded( - rowVector, inputRowMetadata, copiedResult, sql, complexConstants); + inputTestCases, + inputRowMetadata, + copiedResult, + sql, + complexConstants); throw; } } - } else { - VLOG(1) << "Execute with simplified expression eval path."; - try { - exec::ExprSetSimplified exprSetSimplified(plans, execCtx_); - auto inputRowVector = - applyCommonDictionaryLayer(rowVector, inputRowMetadata); - if (inputRowVector != rowVector) { - VLOG(1) << "Modified inputs for simplified eval path: "; - logRowVector(inputRowVector); - } - exec::EvalCtx evalCtxSimplified( - execCtx_, &exprSetSimplified, inputRowVector.get()); - - auto copy = BaseVector::copy(*inputRowVector); - exprSetSimplified.eval(rows, evalCtxSimplified, simplifiedEvalResult); - - // Flatten the input vector as an optimization if its very deeply - // nested. - fuzzer::compareVectors( - copy, - BaseVector::copy(*inputRowVector), - "Copy of original input", - "Input after simplified", - rows); - } catch (const VeloxException& e) { - if (e.errorCode() == error_code::kUnsupportedInputUncatchable) { - unsupportedInputUncatchableError = true; - } else if (!e.isUserError()) { - LOG(ERROR) - << "Simplified eval: VeloxRuntimeErrors other than UNSUPPORTED_INPUT_UNCATCHABLE error are not allowed."; - persistReproInfoIfNeeded( - rowVector, inputRowMetadata, copiedResult, sql, complexConstants); - throw; - } - exceptionSimplifiedPtr = std::current_exception(); - } catch (...) { - LOG(ERROR) - << "Simplified eval: Exceptions other than VeloxUserError or VeloxRuntimeError with UNSUPPORTED_INPUT are not allowed."; - persistReproInfoIfNeeded( - rowVector, inputRowMetadata, copiedResult, sql, complexConstants); - throw; + if (!options_.reproPersistPath.empty() && options_.persistAndRunOnce) { + // A guard to make sure it runs only once with persistAndRunOnce flag + // turned on. It shouldn't reach here normally since the flag is used to + // persist repro info for crash failures. But if it hasn't crashed by now, + // we still don't want another iteration. + LOG(WARNING) + << "Iteration succeeded with --persist_and_run_once flag enabled " + "(expecting crash failure)"; + exit(0); } - try { - // Compare results or exceptions (if any). Fail if anything is - // different. - if (exceptionCommonPtr || exceptionSimplifiedPtr) { - // UNSUPPORTED_INPUT_UNCATCHABLE errors are VeloxRuntimeErrors that - // cannot - // be suppressed by default NULLs. So it may happen that only one of the - // common and simplified path throws this error. In this case, we do not - // compare the exceptions. - if (!unsupportedInputUncatchableError) { - // Throws in case exceptions are not compatible. If they are - // compatible, return false to signal that the expression failed. - fuzzer::compareExceptions(exceptionCommonPtr, exceptionSimplifiedPtr); - } - return { - nullptr, - exceptionCommonPtr ? exceptionCommonPtr : exceptionSimplifiedPtr, - unsupportedInputUncatchableError}; - } else { - // Throws in case output is different. - VELOX_CHECK_EQ(commonEvalResult.size(), plans.size()); - VELOX_CHECK_EQ(simplifiedEvalResult.size(), plans.size()); - for (int i = 0; i < plans.size(); ++i) { - fuzzer::compareVectors( - commonEvalResult[i], - simplifiedEvalResult[i], - "common path results ", - "simplified path results", - rows); - } - } - } catch (...) { - persistReproInfoIfNeeded( - rowVector, inputRowMetadata, copiedResult, sql, complexConstants); - throw; + if (exceptionCommonPtr) { + results.push_back( + {nullptr, exceptionCommonPtr, unsupportedInputUncatchableError}); + } else { + results.push_back( + {VectorMaker(commonEvalResult[0]->pool()).rowVector(commonEvalResult), + nullptr, + unsupportedInputUncatchableError}); } } - - if (!options_.reproPersistPath.empty() && options_.persistAndRunOnce) { - // A guard to make sure it runs only once with persistAndRunOnce flag - // turned on. It shouldn't reach here normally since the flag is used to - // persist repro info for crash failures. But if it hasn't crashed by now, - // we still don't want another iteration. - LOG(WARNING) - << "Iteration succeeded with --persist_and_run_once flag enabled " - "(expecting crash failure)"; - exit(0); - } - - if (exceptionCommonPtr) { - return {nullptr, exceptionCommonPtr, unsupportedInputUncatchableError}; - } else { - return { - VectorMaker(commonEvalResult[0]->pool()).rowVector(commonEvalResult), - nullptr, - unsupportedInputUncatchableError}; - } + return results; } void ExpressionVerifier::persistReproInfoIfNeeded( - const VectorPtr& inputVector, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata, const VectorPtr& resultVector, const std::string& sql, @@ -411,17 +443,18 @@ void ExpressionVerifier::persistReproInfoIfNeeded( LOG(INFO) << "Skipping persistence because repro path is empty."; } else if (!options_.persistAndRunOnce) { persistReproInfo( - inputVector, inputRowMetadata, resultVector, sql, complexConstants); + inputTestCases, inputRowMetadata, resultVector, sql, complexConstants); } } void ExpressionVerifier::persistReproInfo( - const VectorPtr& inputVector, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata, const VectorPtr& resultVector, const std::string& sql, const std::vector& complexConstants) { - std::string inputPath; + std::vector inputPaths; + std::vector inputSelectivityVectorPaths; std::string inputRowMetadataPath; std::string resultPath; std::string sqlPath; @@ -438,12 +471,30 @@ void ExpressionVerifier::persistReproInfo( LOG(INFO) << "Failed to create directory for persisting repro info."; return; } - // Saving input vector - inputPath = fmt::format("{}/{}", dirPath->c_str(), kInputVectorFileName); - try { - saveVectorToFile(inputVector.get(), inputPath.c_str()); - } catch (std::exception& e) { - inputPath = e.what(); + // Saving input test cases + for (int i = 0; i < inputTestCases.size(); i++) { + auto filePath = fmt::format( + "{}/{}_{}", dirPath->c_str(), kInputVectorFileNamePrefix, i); + try { + saveVectorToFile(inputTestCases[i].inputVector.get(), filePath.c_str()); + inputPaths.push_back(filePath); + } catch (std::exception& e) { + inputPaths.clear(); + inputPaths.push_back(e.what()); + break; + } + + filePath = fmt::format( + "{}/{}_{}", dirPath->c_str(), kInputSelectivityVectorFileNamePrefix, i); + try { + saveSelectivityVectorToFile( + inputTestCases[i].activeRows, filePath.c_str()); + inputSelectivityVectorPaths.push_back(filePath); + } catch (std::exception& e) { + inputSelectivityVectorPaths.clear(); + inputSelectivityVectorPaths.push_back(e.what()); + break; + } } // Saving the list of column indices that are to be wrapped in lazy. @@ -491,7 +542,9 @@ void ExpressionVerifier::persistReproInfo( std::stringstream ss; ss << "Persisted input: --fuzzer_repro_path " << dirPath.value(); - ss << " --input_path " << inputPath; + ss << " --input_paths " << boost::algorithm::join(inputPaths, ", ") + << " --input_selectivity_vector_paths " + << boost::algorithm::join(inputSelectivityVectorPaths, ", "); if (resultVector) { ss << " --result_path " << resultPath; } @@ -516,14 +569,13 @@ class MinimalSubExpressionFinder { // Tries subexpressions of plan until finding the minimal failing subtree. void findMinimalExpression( core::TypedExprPtr plan, - const RowVectorPtr& rowVector, - const std::optional& rowsToVerify, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata) { - if (verifyWithResults(plan, rowVector, rowsToVerify, inputRowMetadata)) { + if (verifyWithResults(plan, inputTestCases, inputRowMetadata)) { errorExit("Retry should have failed"); } bool minimalFound = - findMinimalRecursive(plan, rowVector, rowsToVerify, inputRowMetadata); + findMinimalRecursive(plan, inputTestCases, inputRowMetadata); if (minimalFound) { errorExit("Found minimal failing expression."); } else { @@ -542,16 +594,14 @@ class MinimalSubExpressionFinder { // breakpoint inside this to debug failures. bool findMinimalRecursive( core::TypedExprPtr plan, - const RowVectorPtr& rowVector, - const std::optional& rowsToVerify, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata) { bool anyFailed = false; for (auto& input : plan->inputs()) { - if (!verifyWithResults( - input, rowVector, rowsToVerify, inputRowMetadata)) { + if (!verifyWithResults(input, inputTestCases, inputRowMetadata)) { anyFailed = true; - bool minimalFound = findMinimalRecursive( - input, rowVector, rowsToVerify, inputRowMetadata); + bool minimalFound = + findMinimalRecursive(input, inputTestCases, inputRowMetadata); if (minimalFound) { return true; } @@ -560,10 +610,10 @@ class MinimalSubExpressionFinder { if (!anyFailed) { LOG(INFO) << "Failed with all children succeeding: " << plan->toString(); // Re-running the minimum failed. Put breakpoint here to debug. - verifyWithResults(plan, rowVector, rowsToVerify, inputRowMetadata); + verifyWithResults(plan, inputTestCases, inputRowMetadata); if (!inputRowMetadata.columnsToWrapInLazy.empty()) { LOG(INFO) << "Trying without lazy:"; - if (verifyWithResults(plan, rowVector, rowsToVerify, {})) { + if (verifyWithResults(plan, inputTestCases, {})) { LOG(INFO) << "Minimal failure succeeded without lazy vectors"; } } @@ -579,17 +629,16 @@ class MinimalSubExpressionFinder { // contents in result vector. bool verifyWithResults( core::TypedExprPtr plan, - const RowVectorPtr& rowVector, - const std::optional& rowsToVerify, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata) { VectorPtr result; LOG(INFO) << "Running with empty results vector :" << plan->toString(); bool emptyResult = - verifyPlan(plan, rowVector, rowsToVerify, inputRowMetadata, result); + verifyPlan(plan, inputTestCases, inputRowMetadata, result); LOG(INFO) << "Running with non empty vector :" << plan->toString(); result = vectorFuzzer_.fuzzFlat(plan->type()); bool filledResult = - verifyPlan(plan, rowVector, rowsToVerify, inputRowMetadata, result); + verifyPlan(plan, inputTestCases, inputRowMetadata, result); if (emptyResult != filledResult) { LOG(ERROR) << fmt::format( "Different results for empty vs populated ! Empty result = {} filledResult = {}", @@ -603,8 +652,7 @@ class MinimalSubExpressionFinder { // Returns true if the verification is successful. bool verifyPlan( core::TypedExprPtr plan, - const RowVectorPtr& rowVector, - const std::optional& rowsToVerify, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata, VectorPtr results) { // Turn off unnecessary logging. @@ -614,8 +662,7 @@ class MinimalSubExpressionFinder { try { verifier_.verify( {plan}, - rowVector, - rowsToVerify, + inputTestCases, results ? BaseVector::copy(*results) : nullptr, true, // canThrow inputRowMetadata); @@ -635,8 +682,7 @@ void computeMinimumSubExpression( ExpressionVerifier&& minimalVerifier, VectorFuzzer& fuzzer, const std::vector& plans, - const RowVectorPtr& rowVector, - const std::optional& rowsToVerify, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata) { auto finder = MinimalSubExpressionFinder(std::move(minimalVerifier), fuzzer); if (plans.size() > 1) { @@ -648,8 +694,7 @@ void computeMinimumSubExpression( for (auto plan : plans) { LOG(INFO) << "============================================"; LOG(INFO) << "Finding minimal subexpression for plan:" << plan->toString(); - finder.findMinimalExpression( - plan, rowVector, rowsToVerify, inputRowMetadata); + finder.findMinimalExpression(plan, inputTestCases, inputRowMetadata); LOG(INFO) << "============================================"; } } diff --git a/velox/expression/tests/ExpressionVerifier.h b/velox/expression/tests/ExpressionVerifier.h index 85d6771d6a2f5..e079347ea0b38 100644 --- a/velox/expression/tests/ExpressionVerifier.h +++ b/velox/expression/tests/ExpressionVerifier.h @@ -42,7 +42,10 @@ class ExpressionVerifier { public: // File names used to persist data required for reproducing a failed test // case. - static constexpr const std::string_view kInputVectorFileName = "input_vector"; + static constexpr const std::string_view kInputVectorFileNamePrefix = + "input_vector"; + static constexpr const std::string_view + kInputSelectivityVectorFileNamePrefix = "input_selectivity_vector"; static constexpr const std::string_view kInputRowMetadataFileName = "input_row_metadata"; static constexpr const std::string_view kResultVectorFileName = @@ -61,25 +64,20 @@ class ExpressionVerifier { // Executes expressions using common path (all evaluation // optimizations) and compares the result with either the simplified path or a - // reference query runner. An optional selectivity vector 'rowsToVerify' can - // be passed which specifies which rows to evaluate and verify. If its not - // provided (by passing std::nullopt) then all rows will be verified. - // Additionally, a list of column indices can be passed via - // 'columnsToWrapInLazy' which specify the columns/children in the input row - // vector that should be wrapped in a lazy layer before running it through the - // common evaluation path. The list can contain negative column indices that - // represent lazy vectors that should be preloaded before being fed to the - // evaluator. This list is sorted on the absolute value of the entries. + // reference query runner. This execution is done for each input test cases + // where the ExprSet for the common and simplified is reused between test + // cases to simulate batches being processed via a ProjectFilter operator. // Returns: + // A vector of ResultOrError objects, one for each InputTestCase. Each result + // contains: // - result of evaluating the expressions if both paths succeeded and // returned the exact same vectors. // - exception thrown by the common path if both paths failed with compatible // exceptions. // - throws otherwise (incompatible exceptions or different results). - fuzzer::ResultOrError verify( + std::vector verify( const std::vector& plans, - const RowVectorPtr& rowVector, - const std::optional& rowsToVerify, + const std::vector& inputTestCases, VectorPtr&& resultVector, bool canThrow, const InputRowMetadata& inputRowMetadata = {}); @@ -88,7 +86,7 @@ class ExpressionVerifier { // Utility method used to serialize the relevant data required to repro a // crash. void persistReproInfo( - const VectorPtr& inputVector, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata, const VectorPtr& resultVector, const std::string& sql, @@ -98,7 +96,7 @@ class ExpressionVerifier { // options_.reproPersistPath is set and is not persistAndRunOnce. Do nothing // otherwise. void persistReproInfoIfNeeded( - const VectorPtr& inputVector, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata, const VectorPtr& resultVector, const std::string& sql, @@ -117,7 +115,6 @@ void computeMinimumSubExpression( ExpressionVerifier&& minimalVerifier, VectorFuzzer& fuzzer, const std::vector& plans, - const RowVectorPtr& rowVector, - const std::optional& rowsToVerify, + const std::vector& inputTestCases, const InputRowMetadata& inputRowMetadata); } // namespace facebook::velox::test diff --git a/velox/vector/VectorSaver.cpp b/velox/vector/VectorSaver.cpp index 238325d6d2864..bd9ad77ee9f76 100644 --- a/velox/vector/VectorSaver.cpp +++ b/velox/vector/VectorSaver.cpp @@ -20,11 +20,6 @@ namespace facebook::velox { -void writeBuffer(const BufferPtr& buffer, std::ostream& out); -void writeOptionalBuffer(const BufferPtr& buffer, std::ostream& out); -BufferPtr readBuffer(std::istream& in, memory::MemoryPool* pool); -BufferPtr readOptionalBuffer(std::istream& in, memory::MemoryPool* pool); - namespace { enum class Encoding : int8_t { @@ -107,6 +102,45 @@ Encoding readEncoding(std::istream& in) { } } +/// Serializes a BufferPtr into binary format and writes it to the +/// provided output stream. 'buffer' must be non-null. +void writeBuffer(const BufferPtr& buffer, std::ostream& out) { + write(buffer->size(), out); + out.write(buffer->as(), buffer->size()); +} + +/// Serializes a optional BufferPtr into binary format and writes it to the +/// provided output stream. +void writeOptionalBuffer(const BufferPtr& buffer, std::ostream& out) { + if (buffer) { + write(true, out); + writeBuffer(buffer, out); + } else { + write(false, out); + } +} + +/// Deserializes a BufferPtr serialized by 'writeBuffer' from the provided +/// input stream. +BufferPtr readBuffer(std::istream& in, memory::MemoryPool* pool) { + auto numBytes = read(in); + auto buffer = AlignedBuffer::allocate(numBytes, pool); + auto rawBuffer = buffer->asMutable(); + in.read(rawBuffer, numBytes); + return buffer; +} + +/// Deserializes a optional BufferPtr serialized by 'writeOptionalBuffer' from +/// the provided input stream. +BufferPtr readOptionalBuffer(std::istream& in, memory::MemoryPool* pool) { + bool hasBuffer = read(in); + if (hasBuffer) { + return readBuffer(in, pool); + } + + return nullptr; +} + template VectorPtr createFlat( const TypePtr& type, @@ -548,38 +582,6 @@ VectorPtr readLazyVector( } } // namespace -void writeBuffer(const BufferPtr& buffer, std::ostream& out) { - VELOX_CHECK_NOT_NULL(buffer); - write(buffer->size(), out); - out.write(buffer->as(), buffer->size()); -} - -void writeOptionalBuffer(const BufferPtr& buffer, std::ostream& out) { - if (buffer) { - write(true, out); - writeBuffer(buffer, out); - } else { - write(false, out); - } -} - -BufferPtr readBuffer(std::istream& in, memory::MemoryPool* pool) { - auto numBytes = read(in); - auto buffer = AlignedBuffer::allocate(numBytes, pool); - auto rawBuffer = buffer->asMutable(); - in.read(rawBuffer, numBytes); - return buffer; -} - -BufferPtr readOptionalBuffer(std::istream& in, memory::MemoryPool* pool) { - bool hasBuffer = read(in); - if (hasBuffer) { - return readBuffer(in, pool); - } - - return nullptr; -} - void saveType(const TypePtr& type, std::ostream& out) { auto serialized = toJson(type->serialize()); write(serialized, out); @@ -745,6 +747,22 @@ SelectivityVector restoreSelectivityVector(std::istream& in) { rows.setFromBits(reinterpret_cast(bits.data()), size); return rows; } + +void saveSelectivityVectorToFile( + const SelectivityVector& rows, + const char* filePath) { + std::ofstream outputFile(filePath, std::ofstream::binary); + saveSelectivityVector(rows, outputFile); + outputFile.close(); +} + +SelectivityVector restoreSelectivityVectorFromFile(const char* filePath) { + std::ifstream inputFile(filePath, std::ifstream::binary); + VELOX_CHECK(!inputFile.fail(), "Cannot open file: {}", filePath); + auto result = restoreSelectivityVector(inputFile); + inputFile.close(); + return result; +} } // namespace facebook::velox template <> diff --git a/velox/vector/VectorSaver.h b/velox/vector/VectorSaver.h index 63f47fbeba7f3..69b56d9df42a2 100644 --- a/velox/vector/VectorSaver.h +++ b/velox/vector/VectorSaver.h @@ -61,20 +61,10 @@ void saveSelectivityVector(const SelectivityVector& rows, std::ostream& out); /// the provided input stream. SelectivityVector restoreSelectivityVector(std::istream& in); -/// Serializes a BufferPtr into binary format and writes it to the -/// provided output stream. 'buffer' must be non-null. -void writeBuffer(const BufferPtr& buffer, std::ostream& out); +void saveSelectivityVectorToFile( + const SelectivityVector& rows, + const char* filePath); -/// Serializes a optional BufferPtr into binary format and writes it to the -/// provided output stream. -void writeOptionalBuffer(const BufferPtr& buffer, std::ostream& out); - -/// Deserializes a BufferPtr serialized by 'writeBuffer' from the provided -/// input stream. -BufferPtr readBuffer(std::istream& in, memory::MemoryPool* pool); - -/// Deserializes a optional BufferPtr serialized by 'writeOptionalBuffer' from -/// the provided input stream. -BufferPtr readOptionalBuffer(std::istream& in, memory::MemoryPool* pool); +SelectivityVector restoreSelectivityVectorFromFile(const char* filePath); } // namespace facebook::velox