diff --git a/velox/connectors/hive/HiveConnectorSplit.cpp b/velox/connectors/hive/HiveConnectorSplit.cpp index ed69eb7d6303..e05411da9424 100644 --- a/velox/connectors/hive/HiveConnectorSplit.cpp +++ b/velox/connectors/hive/HiveConnectorSplit.cpp @@ -61,7 +61,8 @@ folly::dynamic HiveConnectorSplit::serialize() const { customSplitInfoObj[key] = value; } obj["customSplitInfo"] = customSplitInfoObj; - obj["extraFileInfo"] = *extraFileInfo; + obj["extraFileInfo"] = + extraFileInfo == nullptr ? nullptr : folly::dynamic(*extraFileInfo); folly::dynamic serdeParametersObj = folly::dynamic::object; for (const auto& [key, value] : serdeParameters) { @@ -118,8 +119,9 @@ std::shared_ptr HiveConnectorSplit::create( customSplitInfo[key.asString()] = value.asString(); } - std::shared_ptr extraFileInfo = - std::make_shared(obj["extraFileInfo"].asString()); + std::shared_ptr extraFileInfo = obj["extraFileInfo"].isNull() + ? nullptr + : std::make_shared(obj["extraFileInfo"].asString()); std::unordered_map serdeParameters; for (const auto& [key, value] : obj["serdeParameters"].items()) { serdeParameters[key.asString()] = value.asString(); diff --git a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp index f44bf9348fe4..828547d74b4f 100644 --- a/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp @@ -81,7 +81,11 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase { ASSERT_EQ(value, clone->customSplitInfo.at(key)); } - ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + if (split.extraFileInfo != nullptr) { + ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo); + } else { + ASSERT_EQ(clone->extraFileInfo, nullptr); + } ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size()); for (const auto& [key, value] : split.serdeParameters) { ASSERT_EQ(value, clone->serdeParameters.at(key)); @@ -216,7 +220,7 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { FileProperties fileProperties{ .fileSize = 2048, .modificationTime = std::nullopt}; const auto properties = std::optional(fileProperties); - const auto split = HiveConnectorSplit( + const auto split1 = HiveConnectorSplit( connectorId, filePath, fileFormat, @@ -229,8 +233,24 @@ TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) { serdeParameters, splitWeight, infoColumns, + properties); + testSerde(split1); + + const auto split2 = HiveConnectorSplit( + connectorId, + filePath, + fileFormat, + start, + length, + {}, + tableBucketNumber, + customSplitInfo, + nullptr, + {}, + splitWeight, + {}, std::nullopt); - testSerde(split); + testSerde(split2); } } // namespace diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 9972f46b3ecb..9c38c416259d 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -61,6 +61,8 @@ velox_add_library( QueryDataWriter.cpp QueryMetadataReader.cpp QueryMetadataWriter.cpp + QuerySplitReader.cpp + QuerySplitWriter.cpp QueryTraceConfig.cpp QueryTraceScan.cpp QueryTraceUtil.cpp diff --git a/velox/exec/QueryDataWriter.cpp b/velox/exec/QueryDataWriter.cpp index d8e1ed2e64d5..86ae1bc6d2e9 100644 --- a/velox/exec/QueryDataWriter.cpp +++ b/velox/exec/QueryDataWriter.cpp @@ -26,15 +26,16 @@ namespace facebook::velox::exec::trace { QueryDataWriter::QueryDataWriter( - std::string path, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB) - : dirPath_(std::move(path)), - fs_(filesystems::getFileSystem(dirPath_, nullptr)), + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), pool_(pool), updateAndCheckTraceLimitCB_(std::move(updateAndCheckTraceLimitCB)) { + VELOX_CHECK_NOT_NULL(fs_); dataFile_ = fs_->openFileForWrite( - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataFileName)); + fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataFileName)); VELOX_CHECK_NOT_NULL(dataFile_); } @@ -83,7 +84,7 @@ void QueryDataWriter::finish(bool limitExceeded) { void QueryDataWriter::writeSummary(bool limitExceeded) const { const auto summaryFilePath = - fmt::format("{}/{}", dirPath_, QueryTraceTraits::kDataSummaryFileName); + fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataSummaryFileName); const auto file = fs_->openFileForWrite(summaryFilePath); folly::dynamic obj = folly::dynamic::object; obj[QueryTraceTraits::kDataTypeKey] = dataType_->serialize(); diff --git a/velox/exec/QueryDataWriter.h b/velox/exec/QueryDataWriter.h index 8e6073dcd3b7..a126e1859cfa 100644 --- a/velox/exec/QueryDataWriter.h +++ b/velox/exec/QueryDataWriter.h @@ -30,11 +30,14 @@ namespace facebook::velox::exec::trace { class QueryDataWriter { public: explicit QueryDataWriter( - std::string path, + std::string traceDir, memory::MemoryPool* pool, UpdateAndCheckTraceLimitCB updateAndCheckTraceLimitCB); - /// Serializes rows and writes out each batch. + /// Serializes and writes out each batch, enabling us to replay the execution + /// with the same batch numbers and order. Each serialized batch is flushed + /// immediately, ensuring that the traced operator can be replayed even if a + /// crash occurs during execution. void write(const RowVectorPtr& rows); /// Closes the data file and writes out the data summary. @@ -49,7 +52,7 @@ class QueryDataWriter { // TODO: add more summaries such as number of rows etc. void writeSummary(bool limitExceeded = false) const; - const std::string dirPath_; + const std::string traceDir_; // TODO: make 'useLosslessTimestamp' configuerable. const serializer::presto::PrestoVectorSerde::PrestoOptions options_ = { true, diff --git a/velox/exec/QuerySplitReader.cpp b/velox/exec/QuerySplitReader.cpp new file mode 100644 index 000000000000..1c9240fbc753 --- /dev/null +++ b/velox/exec/QuerySplitReader.cpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/QuerySplitReader.h" +#include "velox/common/file/FileInputStream.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/exec/QueryTraceTraits.h" +#include "velox/exec/QueryTraceUtil.h" + +using namespace facebook::velox::connector::hive; + +namespace facebook::velox::exec::trace { + +QuerySplitReader::QuerySplitReader( + std::string traceDir, + memory::MemoryPool* pool) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)), + pool_(pool), + splitInfoStream_(getSplitInputStream()) { + VELOX_CHECK_NOT_NULL(fs_); + VELOX_CHECK_NOT_NULL(splitInfoStream_); +} + +std::vector QuerySplitReader::read() const { + const auto splitStrings = getSplitInfos(splitInfoStream_.get()); + std::vector splits; + for (const auto& splitString : splitStrings) { + folly::dynamic splitInfoObj = folly::parseJson(splitString); + const auto split = + ISerializable::deserialize(splitInfoObj); + splits.emplace_back( + std::make_shared( + split->connectorId, + split->filePath, + split->fileFormat, + split->start, + split->length, + split->partitionKeys, + split->tableBucketNumber, + split->customSplitInfo, + split->extraFileInfo, + split->serdeParameters, + split->splitWeight, + split->infoColumns, + split->properties), + -1); + } + return splits; +} + +std::unique_ptr QuerySplitReader::getSplitInputStream() + const { + auto splitInfoFile = fs_->openFileForRead( + fmt::format("{}/{}", traceDir_, QueryTraceTraits::kSplitInfoFileName)); + // TODO: Make the buffer size configurable. + return std::make_unique( + std::move(splitInfoFile), 1 << 20, pool_); +} + +// static +std::vector QuerySplitReader::getSplitInfos( + common::FileInputStream* stream) { + std::vector splits; + try { + while (!stream->atEnd()) { + const auto length = stream->read(); + std::string splitInfoString(length, '\0'); + stream->readBytes( + reinterpret_cast(splitInfoString.data()), length); + const auto crc32 = stream->read(); + const auto actualCrc32 = folly::crc32( + reinterpret_cast(splitInfoString.data()), + splitInfoString.size()); + if (crc32 != actualCrc32) { + LOG(ERROR) << "Fails to verify the checksum " << crc32 + << " does not equal to the actual checksum " << actualCrc32; + break; + } + splits.push_back(std::move(splitInfoString)); + } + } catch (const VeloxException& e) { + LOG(ERROR) << "Fails to deserialize split string from the stream for " + << e.message(); + } + return splits; +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QuerySplitReader.h b/velox/exec/QuerySplitReader.h new file mode 100644 index 000000000000..c45024e9d374 --- /dev/null +++ b/velox/exec/QuerySplitReader.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/common/file/FileInputStream.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Split.h" + +namespace facebook::velox::exec::trace { +/// Used to load the input splits from a tracing 'TableScan' +/// operator, and for getting the traced splits when relaying 'TableScan'. +/// +/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will +/// be extended to handle more types of splits, such as +/// 'IcebergHiveConnectorSplit'. +class QuerySplitReader { + public: + explicit QuerySplitReader(std::string traceDir, memory::MemoryPool* pool); + + /// Reads from 'splitInfoStream_' and deserializes to 'splitInfos'. Returns + /// all the correctly traced splits. + std::vector read() const; + + private: + static std::vector getSplitInfos( + common::FileInputStream* stream); + + std::unique_ptr getSplitInputStream() const; + + const std::string traceDir_; + const std::shared_ptr fs_; + memory::MemoryPool* const pool_; + const std::unique_ptr splitInfoStream_; +}; +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QuerySplitWriter.cpp b/velox/exec/QuerySplitWriter.cpp new file mode 100644 index 000000000000..e9bd4ebcf7ec --- /dev/null +++ b/velox/exec/QuerySplitWriter.cpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/QuerySplitWriter.h" +#include "QueryTraceUtil.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/exec/QueryTraceTraits.h" + +using namespace facebook::velox::connector::hive; + +namespace facebook::velox::exec::trace { +/// Used to record and load the input splits from a tracing 'TableScan' +/// operator, and for getting the traced splits when relaying 'TableScan'. +/// +/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will +/// be extended to handle more types of splits, such as +/// 'IcebergHiveConnectorSplit'. +QuerySplitWriter::QuerySplitWriter(std::string traceDir) + : traceDir_(std::move(traceDir)), + fs_(filesystems::getFileSystem(traceDir_, nullptr)) { + VELOX_CHECK_NOT_NULL(fs_); + splitInfoFile_ = fs_->openFileForWrite( + fmt::format("{}/{}", traceDir_, QueryTraceTraits::kSplitInfoFileName)); + VELOX_CHECK_NOT_NULL(splitInfoFile_); +} + +void QuerySplitWriter::write(const exec::Split& split) const { + VELOX_CHECK(!split.hasGroup(), "Do not support grouped execution"); + VELOX_CHECK(split.hasConnectorSplit()); + const auto splitObj = split.connectorSplit->serialize(); + const auto splitJson = folly::toJson(splitObj); + auto ioBuf = appendToBuffer(splitJson); + splitInfoFile_->append(std::move(ioBuf)); +} + +void QuerySplitWriter::finish() { + if (finished_) { + return; + } + + VELOX_CHECK_NOT_NULL( + splitInfoFile_, "The query data writer has already been finished"); + splitInfoFile_->close(); + splitInfoFile_.reset(); + finished_ = true; +} + +// static +std::unique_ptr QuerySplitWriter::appendToBuffer( + const std::string& split) { + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + auto ioBuf = + folly::IOBuf::create(sizeof(length) + split.size() + sizeof(crc32)); + folly::io::Appender appender(ioBuf.get(), 0); + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + appender.writeLE(crc32); + return ioBuf; +} +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QuerySplitWriter.h b/velox/exec/QuerySplitWriter.h new file mode 100644 index 000000000000..4b1ad3ede47f --- /dev/null +++ b/velox/exec/QuerySplitWriter.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/file/FileSystems.h" +#include "velox/exec/Split.h" + +#include + +namespace facebook::velox::exec::trace { +/// Used to write the input splits during the execution of a traced 'TableScan' +/// operator. +/// +/// Currently, it only works with 'HiveConnectorSplit'. In the future, it will +/// be extended to handle more types of splits, such as +/// 'IcebergHiveConnectorSplit'. +class QuerySplitWriter { + public: + explicit QuerySplitWriter(std::string traceDir); + + /// Serializes and writes out each split. Each serialized split is immediately + /// flushed to ensure that we can still replay a traced operator even if a + /// crash occurs during execution. + void write(const exec::Split& split) const; + + void finish(); + + static std::unique_ptr appendToBuffer(const std::string& split); + + const std::string traceDir_; + const std::shared_ptr fs_; + std::unique_ptr splitInfoFile_; + bool finished_{false}; +}; +} // namespace facebook::velox::exec::trace diff --git a/velox/exec/QueryTraceTraits.h b/velox/exec/QueryTraceTraits.h index ad817115b490..8c5100d38017 100644 --- a/velox/exec/QueryTraceTraits.h +++ b/velox/exec/QueryTraceTraits.h @@ -31,5 +31,6 @@ struct QueryTraceTraits { static inline const std::string kQueryMetaFileName = "query_meta.json"; static inline const std::string kDataSummaryFileName = "data_summary.json"; static inline const std::string kDataFileName = "trace.data"; + static inline const std::string kSplitInfoFileName = "trace.split"; }; } // namespace facebook::velox::exec::trace diff --git a/velox/exec/tests/QueryTraceTest.cpp b/velox/exec/tests/QueryTraceTest.cpp index 0f7671672214..dbb0853029d9 100644 --- a/velox/exec/tests/QueryTraceTest.cpp +++ b/velox/exec/tests/QueryTraceTest.cpp @@ -20,11 +20,14 @@ #include #include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/PartitionFunction.h" #include "velox/exec/QueryDataReader.h" #include "velox/exec/QueryDataWriter.h" #include "velox/exec/QueryMetadataReader.h" #include "velox/exec/QueryMetadataWriter.h" +#include "velox/exec/QuerySplitReader.h" +#include "velox/exec/QuerySplitWriter.h" #include "velox/exec/QueryTraceUtil.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" @@ -50,6 +53,7 @@ class QueryTracerTest : public HiveConnectorTestBase { connector::hive::LocationHandle::registerSerDe(); connector::hive::HiveColumnHandle::registerSerDe(); connector::hive::HiveInsertTableHandle::registerSerDe(); + connector::hive::HiveConnectorSplit::registerSerDe(); core::PlanNode::registerSerDe(); core::ITypedExpr::registerSerDe(); registerPartitionFunctionSerDe(); @@ -247,6 +251,91 @@ TEST_F(QueryTracerTest, traceMetadata) { } } +TEST_F(QueryTracerTest, traceSplit) { + constexpr auto numSplits = 13; + std::vector splits; + for (int i = 0; i < numSplits; ++i) { + auto builder = HiveConnectorSplitBuilder(fmt::format("path-{}-{}", i, i)); + const auto key = fmt::format("k{}", i); + const auto value = fmt::format("v{}", i); + splits.emplace_back( + builder.start(i) + .length(i) + .connectorId(fmt::format("{}", i)) + .fileFormat(dwio::common::FileFormat(i + 1)) + .infoColumn(key, value) + .partitionKey( + key, i > 1 ? std::nullopt : std::optional(value)) + .tableBucketNumber(i) + .build(), + -1); + } + + enum class TestMode { kNormal = 0, kPartial = 1, kChecksum = 2 }; + + const auto testDir = TempDirectoryPath::create(); + const auto fs = filesystems::getFileSystem(testDir->getPath(), nullptr); + const std::unordered_map traceDirs{ + {TestMode::kNormal, fmt::format("{}/normal", testDir->getPath())}, + {TestMode::kPartial, fmt::format("{}/partial", testDir->getPath())}, + {TestMode::kChecksum, fmt::format("{}/checksum", testDir->getPath())}}; + const std::string split = "deadbeaf"; + const uint32_t length = split.length(); + const uint32_t crc32 = folly::crc32( + reinterpret_cast(split.data()), split.size()); + struct { + TestMode testMode; + uint32_t ioBufSize; + std::string debugString() const { + return fmt::format( + "Test mode: {}, ioBufSize: {}", + static_cast(testMode), + ioBufSize); + } + } testSettings[]{ + {TestMode::kNormal, 0}, + {TestMode::kPartial, 4 + length}, + {TestMode::kChecksum, 4 + length + 4}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + fs->mkdir(traceDirs.at(testData.testMode)); + const auto& traceDir = traceDirs.at(testData.testMode); + auto writer = exec::trace::QuerySplitWriter(traceDir); + for (int i = 0; i < numSplits; ++i) { + writer.write(splits.at(i)); + } + writer.finish(); + + if (testData.testMode != TestMode::kNormal) { + const auto splitInfoFile = fs->openFileForWrite( + fmt::format("{}/{}", traceDir, QueryTraceTraits::kSplitInfoFileName), + filesystems::FileOptions{.shouldThrowOnFileAlreadyExists = false}); + auto ioBuf = folly::IOBuf::create(testData.ioBufSize); + folly::io::Appender appender(ioBuf.get(), 0); + appender.writeLE(length); + appender.push(reinterpret_cast(split.data()), length); + if (testData.testMode == TestMode::kChecksum) { + appender.writeLE(crc32 - 1); + } + splitInfoFile->append(std::move(ioBuf)); + splitInfoFile->close(); + } + + const auto reader = exec::trace::QuerySplitReader( + traceDir, memory::MemoryManager::getInstance()->tracePool()); + auto actualSplits = reader.read(); + for (int i = 0; i < numSplits; ++i) { + ASSERT_FALSE(actualSplits[i].hasGroup()); + ASSERT_TRUE(actualSplits[i].hasConnectorSplit()); + const auto actualConnectorSplit = actualSplits[i].connectorSplit; + const auto expectedConnectorSplit = splits[i].connectorSplit; + ASSERT_EQ( + actualConnectorSplit->toString(), expectedConnectorSplit->toString()); + } + } +} + TEST_F(QueryTracerTest, task) { const auto rowType = ROW({"c0", "c1", "c2", "c3", "c4", "c5"},