From a1afdddd3c1197770833dbd0b8836e088a37fd19 Mon Sep 17 00:00:00 2001 From: Pratik Joseph Dabre Date: Wed, 2 Oct 2024 10:24:05 -0700 Subject: [PATCH] [native] Add TPC-DS connector Co-authored-by: Pramod Satya --- presto-docs/src/main/sphinx/presto-cpp.rst | 4 +- .../facebook/presto/hive/HiveQueryRunner.java | 5 +- .../etc/catalog/tpcds.properties | 1 + .../presto_cpp/main/CMakeLists.txt | 3 +- .../presto_cpp/main/PrestoServer.cpp | 2 + .../presto_cpp/main/connectors/CMakeLists.txt | 15 + .../main/connectors/tpcds/CMakeLists.txt | 51 + .../main/connectors/tpcds/DSDGenIterator.cpp | 98 ++ .../main/connectors/tpcds/DSDGenIterator.h | 74 ++ .../main/connectors/tpcds/TpcdsConnector.cpp | 139 +++ .../main/connectors/tpcds/TpcdsConnector.h | 180 ++++ .../connectors/tpcds/TpcdsConnectorSplit.h | 66 ++ .../main/connectors/tpcds/TpcdsGen.cpp | 945 ++++++++++++++++++ .../main/connectors/tpcds/TpcdsGen.h | 99 ++ .../connectors/tpcds/utils/append_info-c.cpp | 158 +++ .../connectors/tpcds/utils/append_info-c.h | 38 + .../presto_cpp/main/tests/CMakeLists.txt | 1 + .../main/types/PrestoToVeloxConnector.cpp | 102 +- .../main/types/PrestoToVeloxConnector.h | 26 + .../main/types/tests/CMakeLists.txt | 3 + .../presto_protocol/ConnectorProtocol.h | 11 + .../presto_protocol/presto_protocol.cpp | 203 ++++ .../presto_protocol/presto_protocol.h | 80 ++ .../presto_protocol/presto_protocol.yml | 12 + .../ConnectorTransactionHandle.cpp.inc | 1 + .../special/TpcdsTransactionHandle.cpp.inc | 30 + .../special/TpcdsTransactionHandle.hpp.inc | 28 + ...stractTestNativeTpcdsConnectorQueries.java | 83 ++ .../PrestoNativeQueryRunnerUtils.java | 4 + ...TestPrestoNativeTpcdsConnectorQueries.java | 35 + .../presto/tpcds/TpcdsSplitManager.java | 25 +- 31 files changed, 2489 insertions(+), 33 deletions(-) create mode 100644 presto-native-execution/etc/catalog/tpcds.properties create mode 100644 presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/CMakeLists.txt create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.cpp create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.h create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.cpp create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.h create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.cpp create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.h create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.cpp create mode 100644 presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.h create mode 100644 presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.cpp.inc create mode 100644 presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.hpp.inc create mode 100644 presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeTpcdsConnectorQueries.java create mode 100644 presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpcdsConnectorQueries.java diff --git a/presto-docs/src/main/sphinx/presto-cpp.rst b/presto-docs/src/main/sphinx/presto-cpp.rst index 0f9ed62a12ced..3e7b5d5fd8903 100644 --- a/presto-docs/src/main/sphinx/presto-cpp.rst +++ b/presto-docs/src/main/sphinx/presto-cpp.rst @@ -49,4 +49,6 @@ Only specific connectors are supported in the Presto C++ evaluation engine. * Iceberg connector supports both V1 and V2 tables, including tables with delete files. -* TPCH connector, with ``tpch.naming=standard`` catalog property. \ No newline at end of file +* TPCH connector, with ``tpch.naming=standard`` catalog property. + +* TPCDS connector. \ No newline at end of file diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java b/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java index a6e842b29b627..4a7488ff23d34 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java @@ -210,7 +210,10 @@ public static DistributedQueryRunner createQueryRunner( queryRunner.installPlugin(new TpcdsPlugin()); queryRunner.installPlugin(new TestingHiveEventListenerPlugin()); queryRunner.createCatalog("tpch", "tpch"); - queryRunner.createCatalog("tpcds", "tpcds"); + Map tpcdsProperties = ImmutableMap.builder() + .put("tpcds.toggle-char-to-varchar", "true") + .build(); + queryRunner.createCatalog("tpcds", "tpcds", tpcdsProperties); Map tpchProperties = ImmutableMap.builder() .put("tpch.column-naming", "standard") .build(); diff --git a/presto-native-execution/etc/catalog/tpcds.properties b/presto-native-execution/etc/catalog/tpcds.properties new file mode 100644 index 0000000000000..ba8147db14c71 --- /dev/null +++ b/presto-native-execution/etc/catalog/tpcds.properties @@ -0,0 +1 @@ +connector.name=tpcds diff --git a/presto-native-execution/presto_cpp/main/CMakeLists.txt b/presto-native-execution/presto_cpp/main/CMakeLists.txt index 30ba84dc5461d..17c03d26821b2 100644 --- a/presto-native-execution/presto_cpp/main/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/CMakeLists.txt @@ -14,6 +14,7 @@ add_subdirectory(types) add_subdirectory(http) add_subdirectory(common) add_subdirectory(thrift) +add_subdirectory(connectors) add_library( presto_server_lib @@ -93,7 +94,7 @@ add_executable(presto_server PrestoMain.cpp) # "undefined reference to `vtable for velox::connector::tpch::TpchTableHandle`" # TODO: Fix these errors. target_link_libraries(presto_server presto_server_lib velox_hive_connector - velox_tpch_connector) + velox_tpch_connector presto_tpcds_connector) if(PRESTO_ENABLE_REMOTE_FUNCTIONS) add_library(presto_server_remote_function JsonSignatureParser.cpp diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 3c919b7448927..12b08faeb0e59 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -269,6 +269,8 @@ void PrestoServer::run() { std::make_unique("system")); registerPrestoToVeloxConnector( std::make_unique("$system@system")); + registerPrestoToVeloxConnector( + std::make_unique("tpcds")); initializeVeloxMemory(); initializeThreadPools(); diff --git a/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt b/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt new file mode 100644 index 0000000000000..46f0608b8e9b1 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_subdirectory(tpcds) diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/CMakeLists.txt b/presto-native-execution/presto_cpp/main/connectors/tpcds/CMakeLists.txt new file mode 100644 index 0000000000000..d6738c90eb836 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/CMakeLists.txt @@ -0,0 +1,51 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_policy(SET CMP0079 NEW) + +project(TPCDS) + +add_library(presto_tpcds_connector OBJECT TpcdsConnector.cpp) +target_link_libraries(presto_tpcds_connector velox_connector tpcds_gen fmt::fmt) + +# Without this hack, there are multiple link errors similar to the one below +# only on GCC. "undefined reference to `vtable for +# velox::connector::tpcds::TpcdsTableHandle`. TODO: Fix this hack. +target_link_libraries(velox_exec_test_lib presto_tpcds_connector) + +if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wno-deprecated-declarations -Wno-writable-strings + -Wno-missing-field-initializers) +endif() + +# This stringop-overflow warning seems to have lots of false positives and has +# been the source of a lot of compiler bug reports (e.g. +# https://gcc.gnu.org/bugzilla/show_bug.cgi?id=99578), which causes +# parquet-amalgamation.cpp to fail to compile. For now, we disable this warning +# on the affected compiler (GCC). +if(CMAKE_CXX_COMPILER_ID MATCHES "GNU") + add_compile_options(-Wno-stringop-overflow -Wno-write-strings) +endif() + +# Add subdirectories +add_subdirectory(${CMAKE_SOURCE_DIR}/presto_cpp/external/dsdgen/dsdgen-c build) + +add_library(append_info OBJECT utils/append_info-c.cpp) +target_link_libraries(append_info velox_vector_test_lib Folly::folly xsimd) +target_link_libraries(dsdgen_c append_info) + +add_library(tpcds_gen TpcdsGen.cpp DSDGenIterator.cpp) +target_include_directories(tpcds_gen PUBLIC dsdgen/include) +target_link_libraries(tpcds_gen velox_memory velox_vector dsdgen_c append_info + fmt::fmt) diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.cpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.cpp new file mode 100644 index 0000000000000..9531afb987990 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.cpp @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/connectors/tpcds/DSDGenIterator.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/dist.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/genrand.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/parallel.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/params.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/scaling.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/tdefs.h" +#include "velox/common/base/Exceptions.h" + +using namespace facebook::velox; + +namespace facebook::presto::connector::tpcds { + +void initializeDSDgen( + double scale, + int32_t parallel, + int32_t child, + DSDGenContext& dsdGenContext) { + dsdGenContext.Reset(); + resetCountCount(); + + std::string scaleStr = std::to_string(scale); + set_str("SCALE", scaleStr.c_str(), dsdGenContext); + std::string parallelStr = std::to_string(parallel); + set_str("PARALLEL", parallelStr.c_str(), dsdGenContext); + std::string childStr = std::to_string(child); + set_str("CHILD", childStr.c_str(), dsdGenContext); + + init_rand(dsdGenContext); // no random numbers without this +} + +std::string getQuery(int query) { + if (query <= 0 || query > TPCDS_QUERIES_COUNT) { + throw std::exception(); + } + return TPCDS_QUERIES[query - 1]; +} + +DSDGenIterator::DSDGenIterator( + double scaleFactor, + int32_t parallel, + int32_t child) { + tableDefs_.resize(DBGEN_VERSION); // there are 24 TPC-DS tables + VELOX_CHECK_GE(scaleFactor, 0.0, "Tpcds scale factor must be non-negative"); + initializeDSDgen(scaleFactor, parallel, child, dsdgenCtx_); +} + +void DSDGenIterator::initializeTable( + const std::vector& children, + int table_id) { + auto tdef = getSimpleTdefsByNumber(table_id, dsdgenCtx_); + TpcdsTableDef table_def; + table_def.name = tdef->name; + table_def.fl_child = tdef->flags & FL_CHILD ? 1 : 0; + table_def.fl_small = tdef->flags & FL_SMALL ? 1 : 0; + table_def.first_column = tdef->nFirstColumn; + table_def.children = children; + table_def.dsdGenContext = &dsdgenCtx_; + tableDefs_[table_id] = std::make_unique(table_def); +} + +std::vector>& DSDGenIterator::getTableDefs() { + return tableDefs_; +}; + +tpcds_builder_func DSDGenIterator::getTDefFunctionByNumber(int table_id) { + auto table_funcs = getTdefFunctionsByNumber(table_id); + return table_funcs->builder; +} + +void DSDGenIterator::initTableOffset(int32_t table_id, size_t offset) { + row_skip(table_id, offset, dsdgenCtx_); +} +void DSDGenIterator::genRow(int32_t table_id, size_t index) { + auto builder_func = getTDefFunctionByNumber(table_id); + builder_func((void*)&tableDefs_, index, dsdgenCtx_); + row_stop(table_id, dsdgenCtx_); +} + +int64_t DSDGenIterator::getRowCount(int32_t table) { + return get_rowcount(table, dsdgenCtx_); +} + +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.h new file mode 100644 index 0000000000000..512d92de6a36c --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/DSDGenIterator.h @@ -0,0 +1,74 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "presto_cpp/external/dsdgen/include/dsdgen-c/dist.h" +#include "presto_cpp/external/dsdgen/include/tpcds_constants.hpp" +#include "presto_cpp/main/connectors/tpcds/utils/append_info-c.h" + +using namespace facebook::velox; +namespace facebook::presto::connector::tpcds { + +typedef int64_t ds_key_t; + +typedef int (*tpcds_builder_func)(void*, ds_key_t, DSDGenContext& dsdgenCtx); + +void initializeDSDgen( + double scale, + int32_t parallel, + int32_t child, + DSDGenContext& dsdGenContext); + +std::string getQuery(int query); + +/// This class exposes a thread-safe and reproducible iterator over TPC-DS +/// synthetically generated data, backed by DSDGEN. +class DSDGenIterator { + public: + explicit DSDGenIterator(double scaleFactor, int32_t parallel, int32_t child); + + /// Initializes the table definition and the table schema. + void initializeTable(const std::vector& children, int table); + + /// Returns a vector of all the table definitions. + std::vector>& getTableDefs(); + + // Before generating records using the gen*() functions below, call the + // initTableOffset(int32_t table_id, size_t offset) function to correctly + // initialize the seed given the offset to be generated. + // table_id corresponds to the table that needs to be generated and offset + // specifies the number of rows to skip before using the gen*() functions. + void initTableOffset(int32_t table_id, size_t offset); + + /// Generate different types of records. + // table_id corresponds to the table that is to be generated and row is the + // row to be generated. + void genRow(int32_t table_id, size_t row); + + /// Gets the row count for a table. + ds_key_t getRowCount(int32_t table_id); + + // Gets the metadata for a table, which hold information about the mk_*() + // functions responsible for generating the data. + tpcds_builder_func getTDefFunctionByNumber(int table_id); + + protected: + DSDGenContext dsdgenCtx_; + std::vector> tableDefs_; +}; + +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.cpp new file mode 100644 index 0000000000000..6c318833e9267 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.cpp @@ -0,0 +1,139 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/connectors/tpcds/TpcdsConnector.h" +#include "presto_cpp/main/connectors/tpcds/DSDGenIterator.h" +#include "presto_cpp/main/connectors/tpcds/TpcdsGen.h" + +using namespace ::facebook::velox; +namespace facebook::presto::connector::tpcds { + +using facebook::presto::connector::tpcds::Table; + +std::string TpcdsTableHandle::toString() const { + return fmt::format( + "table: {}, scale factor: {}", toTableName(table_), scaleFactor_); +} + +TpcdsDataSource::TpcdsDataSource( + const std::shared_ptr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + velox::memory::MemoryPool* FOLLY_NONNULL pool) + : pool_(pool) { + auto tpcdsTableHandle = + std::dynamic_pointer_cast(tableHandle); + VELOX_CHECK_NOT_NULL( + tpcdsTableHandle, "TableHandle must be an instance of TpcdsTableHandle"); + table_ = tpcdsTableHandle->getTpcdsTable(); + scaleFactor_ = tpcdsTableHandle->getScaleFactor(); + DSDGenIterator dsdGenIterator(scaleFactor_, 1, 1); + rowCount_ = dsdGenIterator.getRowCount(static_cast(table_)); + + auto tpcdsTableSchema = getTableSchema(tpcdsTableHandle->getTpcdsTable()); + VELOX_CHECK_NOT_NULL(tpcdsTableSchema, "TpcdsSchema can't be null."); + + outputColumnMappings_.reserve(outputType->size()); + + for (const auto& outputName : outputType->names()) { + auto it = columnHandles.find(outputName); + VELOX_CHECK( + it != columnHandles.end(), + "ColumnHandle is missing for output column '{}' on table '{}'", + outputName, + toTableName(table_)); + + auto handle = std::dynamic_pointer_cast(it->second); + VELOX_CHECK_NOT_NULL( + handle, + "ColumnHandle must be an instance of TpcdsColumnHandle " + "for '{}' on table '{}'", + handle->name(), + toTableName(table_)); + + auto idx = tpcdsTableSchema->getChildIdxIfExists(handle->name()); + VELOX_CHECK( + idx != std::nullopt, + "Column '{}' not found on TPC-DS table '{}'.", + handle->name(), + toTableName(table_)); + outputColumnMappings_.emplace_back(*idx); + } + outputType_ = outputType; +} + +RowVectorPtr TpcdsDataSource::projectOutputColumns(RowVectorPtr inputVector) { + std::vector children; + children.reserve(outputColumnMappings_.size()); + + for (const auto channel : outputColumnMappings_) { + children.emplace_back(inputVector->childAt(channel)); + } + + return std::make_shared( + pool_, + outputType_, + BufferPtr(), + inputVector->size(), + std::move(children)); +} + +void TpcdsDataSource::addSplit(std::shared_ptr split) { + VELOX_CHECK_EQ( + currentSplit_, + nullptr, + "Previous split has not been processed yet. Call next() to process the split."); + currentSplit_ = std::dynamic_pointer_cast(split); + VELOX_CHECK(currentSplit_, "Wrong type of split for TpcdsDataSource."); + + size_t partSize = + std::ceil((double)rowCount_ / (double)currentSplit_->totalParts_); + + splitOffset_ = partSize * currentSplit_->partNumber_; + splitEnd_ = splitOffset_ + partSize; +} + +std::optional TpcdsDataSource::next( + uint64_t size, + velox::ContinueFuture& /*future*/) { + VELOX_CHECK_NOT_NULL( + currentSplit_, "No split to process. Call addSplit() first."); + + size_t maxRows = std::min(size, (splitEnd_ - splitOffset_)); + vector_size_t parallel = currentSplit_->totalParts_; + vector_size_t child = currentSplit_->partNumber_; + auto outputVector = genTpcdsData( + table_, maxRows, splitOffset_, pool_, scaleFactor_, parallel, child); + + // If the split is exhausted. + if (!outputVector || outputVector->size() == 0) { + currentSplit_ = nullptr; + return nullptr; + } + + // splitOffset needs to advance based on maxRows passed to getTpcdsData(), and + // not the actual number of returned rows in the output vector, as they are + // not the same for lineitem. + splitOffset_ += maxRows; + completedRows_ += outputVector->size(); + completedBytes_ += outputVector->retainedSize(); + + return projectOutputColumns(outputVector); +} + +VELOX_REGISTER_CONNECTOR_FACTORY(std::make_shared()) + +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.h new file mode 100644 index 0000000000000..6c81e58e5e7b3 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnector.h @@ -0,0 +1,180 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h" +#include "presto_cpp/main/connectors/tpcds/TpcdsGen.h" +#include "velox/connectors/Connector.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector; + +namespace facebook::presto::connector::tpcds { + +class TpcdsConnector; + +// TPC-DS column handle only needs the column name (all columns are generated in +// the same way). +class TpcdsColumnHandle : public velox::connector::ColumnHandle { + public: + explicit TpcdsColumnHandle(const std::string& name) : name_(name) {} + + const std::string& name() const { + return name_; + } + + private: + const std::string name_; +}; + +// TPC-DS table handle uses the underlying enum to describe the target table. +class TpcdsTableHandle : public ConnectorTableHandle { + public: + explicit TpcdsTableHandle( + std::string connectorId, + tpcds::Table table, + double scaleFactor = 1.0) + : ConnectorTableHandle(std::move(connectorId)), + table_(table), + scaleFactor_(scaleFactor) { + VELOX_CHECK_GE(scaleFactor, 0.0, "Tpcds scale factor must be non-negative"); + } + + ~TpcdsTableHandle() override {} + + std::string toString() const override; + + tpcds::Table getTpcdsTable() const { + return table_; + } + + double getScaleFactor() const { + return scaleFactor_; + } + + private: + const tpcds::Table table_; + double scaleFactor_; +}; + +class TpcdsDataSource : public velox::connector::DataSource { + public: + TpcdsDataSource( + const std::shared_ptr& outputType, + const std::shared_ptr& + tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + velox::memory::MemoryPool* FOLLY_NONNULL pool); + + void addSplit(std::shared_ptr split) override; + + void addDynamicFilter( + column_index_t /*outputChannel*/, + const std::shared_ptr& /*filter*/) override { + VELOX_NYI("Dynamic filters not supported by TpcdsConnector."); + } + + std::optional next(uint64_t size, velox::ContinueFuture& future) + override; + + uint64_t getCompletedRows() override { + return completedRows_; + } + + uint64_t getCompletedBytes() override { + return completedBytes_; + } + + std::unordered_map runtimeStats() override { + return {}; + } + + private: + RowVectorPtr projectOutputColumns(RowVectorPtr vector); + + tpcds::Table table_; + double scaleFactor_{1.0}; + size_t rowCount_{0}; + RowTypePtr outputType_; + + // Mapping between output columns and their indices (column_index_t) in the + // dsdgen generated datasets. + std::vector outputColumnMappings_; + + std::shared_ptr currentSplit_; + + // Offset of the first row in current split. + uint64_t splitOffset_{0}; + // Offset of the last row in current split. + uint64_t splitEnd_{0}; + + size_t completedRows_{0}; + size_t completedBytes_{0}; + + memory::MemoryPool* FOLLY_NONNULL pool_; +}; + +class TpcdsConnector final : public velox::connector::Connector { + public: + TpcdsConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* FOLLY_NULLABLE /*executor*/) + : Connector(id) {} + + std::unique_ptr createDataSource( + const std::shared_ptr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) override final { + return std::make_unique( + outputType, + tableHandle, + columnHandles, + connectorQueryCtx->memoryPool()); + } + + std::unique_ptr createDataSink( + RowTypePtr /*inputType*/, + std::shared_ptr< + ConnectorInsertTableHandle> /*connectorInsertTableHandle*/, + ConnectorQueryCtx* /*connectorQueryCtx*/, + CommitStrategy /*commitStrategy*/) override final { + VELOX_NYI("TpcdsConnector does not support data sink."); + } +}; + +class TpcdsConnectorFactory : public ConnectorFactory { + public: + static constexpr const char* kTpcdsConnectorName{"tpcds"}; + + TpcdsConnectorFactory() : ConnectorFactory(kTpcdsConnectorName) {} + + explicit TpcdsConnectorFactory(const char* connectorName) + : ConnectorFactory(connectorName) {} + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* executor = nullptr) override { + return std::make_shared(id, config, executor); + } +}; + +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h new file mode 100644 index 0000000000000..055f8d3569fa9 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "velox/connectors/Connector.h" + +using namespace facebook::velox; + +namespace facebook::presto::connector::tpcds { + +struct TpcdsConnectorSplit : public velox::connector::ConnectorSplit { + explicit TpcdsConnectorSplit( + const std::string& connectorId, + const vector_size_t totalParts = 1, + const vector_size_t partNumber = 0) + : velox::connector::ConnectorSplit(connectorId), + totalParts_(totalParts), + partNumber_(partNumber) { + VELOX_CHECK_GE(totalParts, 1, "totalParts must be >= 1"); + VELOX_CHECK_GT(totalParts, partNumber, "totalParts must be > partNumber"); + } + + // In how many parts the generated TPC-DS table will be segmented, roughly + // `rowCount / totalParts` + const vector_size_t totalParts_{1}; + + // Which of these parts will be read by this split. + const vector_size_t partNumber_{0}; +}; + +} // namespace facebook::presto::connector::tpcds + +template <> +struct fmt::formatter + : formatter { + auto format( + facebook::presto::connector::tpcds::TpcdsConnectorSplit s, + format_context& ctx) { + return formatter::format(s.toString(), ctx); + } +}; + +template <> +struct fmt::formatter< + std::shared_ptr> + : formatter { + auto format( + std::shared_ptr + s, + format_context& ctx) { + return formatter::format(s->toString(), ctx); + } +}; diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.cpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.cpp new file mode 100644 index 0000000000000..9adb8e643e5f2 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.cpp @@ -0,0 +1,945 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/connectors/tpcds/TpcdsGen.h" +#include "presto_cpp/main/connectors/tpcds/DSDGenIterator.h" +#include "velox/vector/ComplexVector.h" + +using namespace facebook::velox; +namespace facebook::presto::connector::tpcds { + +namespace { +size_t getVectorSize(size_t rowCount, size_t maxRows, size_t offset) { + if (offset >= rowCount) { + return 0; + } + return std::min(rowCount - offset, maxRows); +} + +std::vector allocateVectors( + const RowTypePtr& type, + size_t vectorSize, + memory::MemoryPool* pool) { + std::vector vector; + vector.reserve(type->size()); + + for (const auto& childType : type->children()) { + vector.emplace_back(BaseVector::create(childType, vectorSize, pool)); + } + return vector; +} + +RowVectorPtr genTpcdsRowVector( + int32_t tableId, + memory::MemoryPool* pool, + RowTypePtr rowType, + std::vector>& tableDef) { + auto rowCount = tableDef[tableId]->rowIndex; + for (auto& child : tableDef[tableId]->children) { + child->resize(rowCount); + } + + return std::make_shared( + pool, + rowType, + BufferPtr(nullptr), + rowCount, + std::move(tableDef[tableId]->children)); +} + +RowVectorPtr genTpcdsTableData( + Table table, + memory::MemoryPool* pool, + size_t maxRows, + size_t offset, + double scaleFactor, + int32_t parallel, + int32_t child) { + // Create schema and allocate vectors. + auto rowType = getTableSchema(table); + auto table_id = static_cast(table); + DSDGenIterator dsdGenIterator(scaleFactor, parallel, child); + size_t vectorSize = + getVectorSize(dsdGenIterator.getRowCount(table_id), maxRows, offset); + auto children = allocateVectors(rowType, vectorSize, pool); + dsdGenIterator.initTableOffset(table_id, offset); + dsdGenIterator.initializeTable(children, table_id); + auto& tableDef = dsdGenIterator.getTableDefs(); + for (size_t i = 0; i < vectorSize; ++i) { + dsdGenIterator.genRow(table_id, i + offset + 1); + } + return std::make_shared( + pool, + rowType, + BufferPtr(nullptr), + vectorSize, + std::move(tableDef[table_id]->children)); +} + +RowVectorPtr genTpcdsParentAndChildTable( + memory::MemoryPool* pool, + size_t maxRows, + size_t offset, + double scaleFactor, + int32_t parallel, + int32_t child, + Table parentTable, + Table childTable, + bool childTableCall) { + // Whenever a call to generate a table which is either marked as a + // parent or a child is requested, both the child and parent tables + // need to be populated. + + auto parentTableType = getTableSchema(parentTable); + DSDGenIterator dsdGenIterator(scaleFactor, parallel, child); + size_t parentTableVectorSize = getVectorSize( + dsdGenIterator.getRowCount(static_cast(parentTable)), + maxRows, + offset); + + size_t parentTableUpperBound = parentTableVectorSize * 16; + auto children = allocateVectors(parentTableType, parentTableUpperBound, pool); + + auto childTableType = getTableSchema(childTable); + auto childChildren = + allocateVectors(childTableType, parentTableUpperBound, pool); + + auto parentTableId = static_cast(parentTable); + auto childTableId = static_cast(childTable); + + dsdGenIterator.initTableOffset(parentTableId, offset); + dsdGenIterator.initializeTable(children, parentTableId); + dsdGenIterator.initializeTable(childChildren, childTableId); + auto& tableDef = dsdGenIterator.getTableDefs(); + for (size_t i = 0; i < parentTableVectorSize; ++i) { + dsdGenIterator.genRow(parentTableId, i + offset + 1); + } + if (childTableCall) { + return genTpcdsRowVector(childTableId, pool, childTableType, tableDef); + } else { + return genTpcdsRowVector(parentTableId, pool, parentTableType, tableDef); + } +} +} // namespace + +static const RowTypePtr callCenterType = ROW( + {"cc_call_center_sk", "cc_call_center_id", + "cc_rec_start_date", "cc_rec_end_date", + "cc_closed_date_sk", "cc_open_date_sk", + "cc_name", "cc_class", + "cc_employees", "cc_sq_ft", + "cc_hours", "cc_manager", + "cc_mkt_id", "cc_mkt_class", + "cc_mkt_desc", "cc_market_manager", + "cc_division", "cc_division_name", + "cc_company", "cc_company_name", + "cc_street_number", "cc_street_name", + "cc_street_type", "cc_suite_number", + "cc_city", "cc_county", + "cc_state", "cc_zip", + "cc_country", "cc_gmt_offset", + "cc_tax_percentage"}, + {BIGINT(), VARCHAR(), DATE(), DATE(), INTEGER(), INTEGER(), + VARCHAR(), VARCHAR(), INTEGER(), INTEGER(), VARCHAR(), VARCHAR(), + INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), INTEGER(), VARCHAR(), + INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), DECIMAL(5, 2), + DECIMAL(5, 2)}); + +static const RowTypePtr catalogPageType = + ROW({"cp_catalog_page_sk", + "cp_catalog_page_id", + "cp_start_date_sk", + "cp_end_date_sk", + "cp_department", + "cp_catalog_number", + "cp_catalog_page_number", + "cp_description", + "cp_type"}, + {BIGINT(), + VARCHAR(), + INTEGER(), + INTEGER(), + VARCHAR(), + INTEGER(), + INTEGER(), + VARCHAR(), + VARCHAR()}); + +static const RowTypePtr catalogReturnsType = ROW( + {"cr_returned_date_sk", + "cr_returned_time_sk", + "cr_item_sk", + "cr_refunded_customer_sk", + "cr_refunded_cdemo_sk", + "cr_refunded_hdemo_sk", + "cr_refunded_addr_sk", + "cr_returning_customer_sk", + "cr_returning_cdemo_sk", + "cr_returning_hdemo_sk", + "cr_returning_addr_sk", + "cr_call_center_sk", + "cr_catalog_page_sk", + "cr_ship_mode_sk", + "cr_warehouse_sk", + "cr_reason_sk", + "cr_order_number", + "cr_return_quantity", + "cr_return_amount", + "cr_return_tax", + "cr_return_amt_inc_tax", + "cr_fee", + "cr_return_ship_cost", + "cr_refunded_cash", + "cr_reversed_charge", + "cr_store_credit", + "cr_net_loss"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), INTEGER(), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2)}); + +static const RowTypePtr catalogSalesType = ROW( + {"cs_sold_date_sk", + "cs_sold_time_sk", + "cs_ship_date_sk", + "cs_bill_customer_sk", + "cs_bill_cdemo_sk", + "cs_bill_hdemo_sk", + "cs_bill_addr_sk", + "cs_ship_customer_sk", + "cs_ship_cdemo_sk", + "cs_ship_hdemo_sk", + "cs_ship_addr_sk", + "cs_call_center_sk", + "cs_catalog_page_sk", + "cs_ship_mode_sk", + "cs_warehouse_sk", + "cs_item_sk", + "cs_promo_sk", + "cs_order_number", + "cs_quantity", + "cs_wholesale_cost", + "cs_list_price", + "cs_sales_price", + "cs_ext_discount_amt", + "cs_ext_sales_price", + "cs_ext_wholesale_cost", + "cs_ext_list_price", + "cs_ext_tax", + "cs_coupon_amt", + "cs_ext_ship_cost", + "cs_net_paid", + "cs_net_paid_inc_tax", + "cs_net_paid_inc_ship", + "cs_net_paid_inc_ship_tax", + "cs_net_profit"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), INTEGER(), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static const RowTypePtr customerType = ROW( + { + "c_customer_sk", + "c_customer_id", + "c_current_cdemo_sk", + "c_current_hdemo_sk", + "c_current_addr_sk", + "c_first_shipto_date_sk", + "c_first_sales_date_sk", + "c_salutation", + "c_first_name", + "c_last_name", + "c_preferred_cust_flag", + "c_birth_day", + "c_birth_month", + "c_birth_year", + "c_birth_country", + "c_login", + "c_email_address", + "c_last_review_date_sk", + }, + { + BIGINT(), + VARCHAR(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + BIGINT(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + INTEGER(), + INTEGER(), + INTEGER(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + BIGINT(), + }); + +static const RowTypePtr customerAddressType = + ROW({"ca_address_sk", + "ca_address_id", + "ca_street_number", + "ca_street_name", + "ca_street_type", + "ca_suite_number", + "ca_city", + "ca_county", + "ca_state", + "ca_zip", + "ca_country", + "ca_gmt_offset", + "ca_location_type"}, + {BIGINT(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + DECIMAL(5, 2), + VARCHAR()}); + +static const RowTypePtr customerDemographicsType = + ROW({"cd_demo_sk", + "cd_gender", + "cd_marital_status", + "cd_education_status", + "cd_purchase_estimate", + "cd_credit_rating", + "cd_dep_count", + "cd_dep_employed_count", + "cd_dep_college_count"}, + {BIGINT(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + INTEGER(), + VARCHAR(), + INTEGER(), + INTEGER(), + INTEGER()}); + +static const RowTypePtr dateDimType = ROW( + { + "d_date_sk", + "d_date_id", + "d_date", + "d_month_seq", + "d_week_seq", + "d_quarter_seq", + "d_year", + "d_dow", + "d_moy", + "d_dom", + "d_qoy", + "d_fy_year", + "d_fy_quarter_seq", + "d_fy_week_seq", + "d_day_name", + "d_quarter_name", + "d_holiday", + "d_weekend", + "d_following_holiday", + "d_first_dom", + "d_last_dom", + "d_same_day_ly", + "d_same_day_lq", + "d_current_day", + "d_current_week", + "d_current_month", + "d_current_quarter", + "d_current_year", + }, + { + BIGINT(), VARCHAR(), DATE(), INTEGER(), INTEGER(), INTEGER(), + INTEGER(), INTEGER(), INTEGER(), INTEGER(), INTEGER(), INTEGER(), + INTEGER(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), INTEGER(), INTEGER(), INTEGER(), INTEGER(), VARCHAR(), + VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + }); + +static const RowTypePtr householdDemographicsType = + ROW({"hd_demo_sk", + "hd_income_band_sk", + "hd_buy_potential", + "hd_dep_count", + "hd_vehicle_count"}, + {BIGINT(), BIGINT(), VARCHAR(), INTEGER(), INTEGER()}); + +static const RowTypePtr incomeBandType = + ROW({"ib_income_band_sk", "ib_lower_bound", "ib_upper_bound"}, + {BIGINT(), INTEGER(), INTEGER()}); + +static const RowTypePtr inventoryType = ROW( + {"inv_date_sk", "inv_item_sk", "inv_warehouse_sk", "inv_quantity_on_hand"}, + {BIGINT(), BIGINT(), BIGINT(), INTEGER()}); + +static const RowTypePtr itemType = ROW( + {"i_item_sk", "i_item_id", "i_rec_start_date", "i_rec_end_date", + "i_item_desc", "i_current_price", "i_wholesale_cost", "i_brand_id", + "i_brand", "i_class_id", "i_class", "i_category_id", + "i_category", "i_manufact_id", "i_manufact", "i_size", + "i_formulation", "i_color", "i_units", "i_container", + "i_manager_id", "i_product_name"}, + {BIGINT(), VARCHAR(), DATE(), DATE(), VARCHAR(), DECIMAL(7, 2), + DECIMAL(7, 2), INTEGER(), VARCHAR(), INTEGER(), VARCHAR(), INTEGER(), + VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), INTEGER(), VARCHAR()}); + +static const RowTypePtr promotionType = + ROW({"p_promo_sk", + "p_promo_id", + "p_start_date_sk", + "p_end_date_sk", + "p_item_sk", + "p_cost", + "p_response_targe", + "p_promo_name", + "p_channel_dmail", + "p_channel_email", + "p_channel_catalog", + "p_channel_tv", + "p_channel_radio", + "p_channel_press", + "p_channel_event", + "p_channel_demo", + "p_channel_details", + "p_purpose", + "p_discount_active"}, + {BIGINT(), + VARCHAR(), + BIGINT(), + BIGINT(), + BIGINT(), + DECIMAL(15, 2), + INTEGER(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR()}); + +static const RowTypePtr reasonType = + ROW({"r_reason_sk", "r_reason_id", "r_reason_desc"}, + {BIGINT(), VARCHAR(), VARCHAR()}); + +static const RowTypePtr shipModeType = + ROW({"sm_ship_mode_sk", + "sm_ship_mode_id", + "sm_type", + "sm_code", + "sm_carrier", + "sm_contract"}, + {BIGINT(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR()}); + +static const RowTypePtr storeType = ROW( + { + "s_store_sk", + "s_store_id", + "s_rec_start_date", + "s_rec_end_date", + "s_closed_date_sk", + "s_store_name", + "s_number_employees", + "s_floor_space", + "s_hours", + "s_manager", + "s_market_id", + "s_geography_class", + "s_market_desc", + "s_market_manager", + "s_division_id", + "s_division_name", + "s_company_id", + "s_company_name", + "s_street_number", + "s_street_name", + "s_street_type", + "s_suite_number", + "s_city", + "s_county", + "s_state", + "s_zip", + "s_country", + "s_gmt_offset", + "s_tax_precentage", + }, + { + BIGINT(), VARCHAR(), DATE(), DATE(), BIGINT(), + VARCHAR(), INTEGER(), INTEGER(), VARCHAR(), VARCHAR(), + INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), INTEGER(), + VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), DECIMAL(5, 2), DECIMAL(5, 2), + }); + +static const RowTypePtr storeReturnsType = + ROW({"sr_returned_date_sk", + "sr_return_time_sk", + "sr_item_sk", + "sr_customer_sk", + "sr_cdemo_sk", + "sr_hdemo_sk", + "sr_addr_sk", + "sr_store_sk", + "sr_reason_sk", + "sr_ticket_number", + "sr_return_quantity", + "sr_return_amt", + "sr_return_tax", + "sr_return_amt_inc_tax", + "sr_fee", + "sr_return_ship_cost", + "sr_refunded_cash", + "sr_reversed_charge", + "sr_store_credit", + "sr_net_loss"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), INTEGER(), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static const RowTypePtr storeSalesType = ROW( + {"ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", + "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", + "ss_addr_sk", "ss_store_sk", "ss_promo_sk", + "ss_ticket_number", "ss_quantity", "ss_wholesale_cost", + "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", + "ss_ext_sales_price", "ss_ext_wholesale_cost", "ss_ext_list_price", + "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", + "ss_net_paid_inc_tax", "ss_net_profit"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + INTEGER(), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static const RowTypePtr timeDimType = + ROW({"t_time_sk", + "t_time_id", + "t_time", + "t_hour", + "t_minute", + "t_second", + "t_am_pm", + "t_shift", + "t_sub_shift", + "t_meal_time"}, + {BIGINT(), + VARCHAR(), + INTEGER(), + INTEGER(), + INTEGER(), + INTEGER(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR()}); + +static const RowTypePtr warehouseType = + ROW({"w_warehouse_sk", + "w_warehouse_id", + "w_warehouse_name", + "w_warehouse_sq_ft", + "w_street_number", + "w_street_name", + "w_street_type", + "w_suite_number", + "w_city", + "w_county", + "w_state", + "w_zip", + "w_country", + "w_gmt_offset"}, + {BIGINT(), + VARCHAR(), + VARCHAR(), + INTEGER(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + VARCHAR(), + DECIMAL(5, 2)}); + +static const RowTypePtr webPageType = + ROW({"wp_web_page_sk", + "wp_web_page_id", + "wp_rec_start_date", + "wp_rec_end_date", + "wp_creation_date_sk", + "wp_access_date_sk", + "wp_autogen_flag", + "wp_customer_sk", + "wp_url", + "wp_type", + "wp_char_count", + "wp_link_count", + "wp_image_count", + "wp_max_ad_count"}, + {BIGINT(), + VARCHAR(), + DATE(), + DATE(), + BIGINT(), + BIGINT(), + VARCHAR(), + BIGINT(), + VARCHAR(), + VARCHAR(), + INTEGER(), + INTEGER(), + INTEGER(), + INTEGER()}); + +static const RowTypePtr webReturnsType = ROW( + {"wr_returned_date_sk", + "wr_returned_time_sk", + "wr_item_sk", + "wr_refunded_customer_sk", + "wr_refunded_cdemo_sk", + "wr_refunded_hdemo_sk", + "wr_refunded_addr_sk", + "wr_returning_customer_sk", + "wr_returning_cdemo_sk", + "wr_returning_hdemo_sk", + "wr_returning_addr_sk", + "wr_web_page_sk", + "wr_reason_sk", + "wr_order_number", + "wr_return_quantity", + "wr_return_amt", + "wr_return_tax", + "wr_return_amt_inc_tax", + "wr_fee", + "wr_return_ship_cost", + "wr_refunded_cash", + "wr_reversed_charge", + "wr_account_credit", + "wr_net_loss"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), INTEGER(), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static const RowTypePtr webSalesType = ROW( + {"ws_sold_date_sk", + "ws_sold_time_sk", + "ws_ship_date_sk", + "ws_item_sk", + "ws_bill_customer_sk", + "ws_bill_cdemo_sk", + "ws_bill_hdemo_sk", + "ws_bill_addr_sk", + "ws_ship_customer_sk", + "ws_ship_cdemo_sk", + "ws_ship_hdemo_sk", + "ws_ship_addr_sk", + "ws_web_page_sk", + "ws_web_site_sk", + "ws_ship_mode_sk", + "ws_warehouse_sk", + "ws_promo_sk", + "ws_order_number", + "ws_quantity", + "ws_wholesale_cost", + "ws_list_price", + "ws_sales_price", + "ws_ext_discount_amt", + "ws_ext_sales_price", + "ws_ext_wholesale_cost", + "ws_ext_list_price", + "ws_ext_tax", + "ws_coupon_amt", + "ws_ext_ship_cost", + "ws_net_paid", + "ws_net_paid_inc_tax", + "ws_net_paid_inc_ship", + "ws_net_paid_inc_ship_tax", + "ws_net_profit"}, + {BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), + BIGINT(), BIGINT(), BIGINT(), INTEGER(), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), + DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2), DECIMAL(7, 2)}); + +static const RowTypePtr websiteType = ROW( + {"web_site_sk", "web_site_id", "web_rec_start_date", + "web_rec_end_date", "web_name", "web_open_date_sk", + "web_close_date_sk", "web_class", "web_manager", + "web_mkt_id", "web_mkt_class", "web_mkt_desc", + "web_market_manager", "web_company_id", "web_company_name", + "web_street_number", "web_street_name", "web_street_type", + "web_suite_number", "web_city", "web_county", + "web_state", "web_zip", "web_country", + "web_gmt_offset", "web_tax_percentage"}, + {BIGINT(), VARCHAR(), DATE(), DATE(), VARCHAR(), BIGINT(), + BIGINT(), VARCHAR(), VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), + VARCHAR(), INTEGER(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), VARCHAR(), + DECIMAL(5, 2), DECIMAL(5, 2)}); + +const RowTypePtr getTableSchema(Table table) { + switch (table) { + case Table::TBL_CALL_CENTER: + return callCenterType; + case Table::TBL_CATALOG_PAGE: + return catalogPageType; + case Table::TBL_CATALOG_RETURNS: + return catalogReturnsType; + case Table::TBL_CATALOG_SALES: + return catalogSalesType; + case Table::TBL_CUSTOMER: + return customerType; + case Table::TBL_CUSTOMER_ADDRESS: + return customerAddressType; + case Table::TBL_CUSTOMER_DEMOGRAPHICS: + return customerDemographicsType; + case Table::TBL_DATE_DIM: + return dateDimType; + case Table::TBL_HOUSEHOLD_DEMOGRAPHICS: + return householdDemographicsType; + case Table::TBL_INCOME_BAND: + return incomeBandType; + case Table::TBL_INVENTORY: + return inventoryType; + case Table::TBL_ITEM: + return itemType; + case Table::TBL_PROMOTION: + return promotionType; + case Table::TBL_REASON: + return reasonType; + case Table::TBL_SHIP_MODE: + return shipModeType; + case Table::TBL_STORE: + return storeType; + case Table::TBL_STORE_RETURNS: + return storeReturnsType; + case Table::TBL_STORE_SALES: + return storeSalesType; + case Table::TBL_TIME_DIM: + return timeDimType; + case Table::TBL_WAREHOUSE: + return warehouseType; + case Table::TBL_WEB_PAGE: + return webPageType; + case Table::TBL_WEB_RETURNS: + return webReturnsType; + case Table::TBL_WEB_SALES: + return webSalesType; + case Table::TBL_WEB_SITE: + return websiteType; + default: + VELOX_UNREACHABLE(); + } +} + +std::string toTableName(Table table) { + switch (table) { + case Table::TBL_CALL_CENTER: + return "call_center"; + case Table::TBL_CATALOG_PAGE: + return "catalog_page"; + case Table::TBL_CATALOG_RETURNS: + return "catalog_returns"; + case Table::TBL_CATALOG_SALES: + return "catalog_sales"; + case Table::TBL_CUSTOMER: + return "customer"; + case Table::TBL_CUSTOMER_ADDRESS: + return "customer_address"; + case Table::TBL_CUSTOMER_DEMOGRAPHICS: + return "customer_demographics"; + case Table::TBL_DATE_DIM: + return "date_dim"; + case Table::TBL_HOUSEHOLD_DEMOGRAPHICS: + return "household_demographics"; + case Table::TBL_INCOME_BAND: + return "income_band"; + case Table::TBL_INVENTORY: + return "inventory"; + case Table::TBL_ITEM: + return "item"; + case Table::TBL_PROMOTION: + return "promotion"; + case Table::TBL_REASON: + return "reason"; + case Table::TBL_SHIP_MODE: + return "ship_mode"; + case Table::TBL_STORE: + return "store"; + case Table::TBL_STORE_RETURNS: + return "store_returns"; + case Table::TBL_STORE_SALES: + return "store_sales"; + case Table::TBL_TIME_DIM: + return "time_dim"; + case Table::TBL_WAREHOUSE: + return "warehouse"; + case Table::TBL_WEB_PAGE: + return "web_page"; + case Table::TBL_WEB_RETURNS: + return "web_returns"; + case Table::TBL_WEB_SALES: + return "web_sales"; + case Table::TBL_WEB_SITE: + return "web_site"; + default: + VELOX_UNREACHABLE(); + } +} + +Table fromTableName(const std::string_view& tableName) { + static const std::unordered_map map{ + {"call_center", Table::TBL_CALL_CENTER}, + {"catalog_page", Table::TBL_CATALOG_PAGE}, + {"catalog_returns", Table::TBL_CATALOG_RETURNS}, + {"catalog_sales", Table::TBL_CATALOG_SALES}, + {"customer", Table::TBL_CUSTOMER}, + {"customer_address", Table::TBL_CUSTOMER_ADDRESS}, + {"customer_demographics", Table::TBL_CUSTOMER_DEMOGRAPHICS}, + {"date_dim", Table::TBL_DATE_DIM}, + {"household_demographics", Table::TBL_HOUSEHOLD_DEMOGRAPHICS}, + {"income_band", Table::TBL_INCOME_BAND}, + {"inventory", Table::TBL_INVENTORY}, + {"item", Table::TBL_ITEM}, + {"promotion", Table::TBL_PROMOTION}, + {"reason", Table::TBL_REASON}, + {"ship_mode", Table::TBL_SHIP_MODE}, + {"store", Table::TBL_STORE}, + {"store_returns", Table::TBL_STORE_RETURNS}, + {"store_sales", Table::TBL_STORE_SALES}, + {"time_dim", Table::TBL_TIME_DIM}, + {"warehouse", Table::TBL_WAREHOUSE}, + {"web_page", Table::TBL_WEB_PAGE}, + {"web_returns", Table::TBL_WEB_RETURNS}, + {"web_sales", Table::TBL_WEB_SALES}, + {"web_site", Table::TBL_WEB_SITE}, + }; + + auto it = map.find(tableName); + if (it != map.end()) { + return it->second; + } + throw std::invalid_argument( + fmt::format("Invalid TPC-DS table name: '{}'", tableName)); +} + +RowVectorPtr genTpcdsData( + Table table, + size_t maxRows, + size_t offset, + memory::MemoryPool* pool, + double scaleFactor, + int32_t parallel, + int32_t child) { + switch (table) { + Table parentTable, childTable; + case Table::TBL_CATALOG_RETURNS: + case Table::TBL_CATALOG_SALES: + parentTable = Table::TBL_CATALOG_SALES; + childTable = Table::TBL_CATALOG_RETURNS; + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + parentTable, + childTable, + table == childTable); + case Table::TBL_WEB_RETURNS: + case Table::TBL_WEB_SALES: + parentTable = Table::TBL_WEB_SALES; + childTable = Table::TBL_WEB_RETURNS; + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + parentTable, + childTable, + table == childTable); + case Table::TBL_STORE_RETURNS: + case Table::TBL_STORE_SALES: + parentTable = Table::TBL_STORE_SALES; + childTable = Table::TBL_STORE_RETURNS; + return genTpcdsParentAndChildTable( + pool, + maxRows, + offset, + scaleFactor, + parallel, + child, + parentTable, + childTable, + table == childTable); + case Table::TBL_CALL_CENTER: + case Table::TBL_CATALOG_PAGE: + case Table::TBL_CUSTOMER: + case Table::TBL_CUSTOMER_ADDRESS: + case Table::TBL_CUSTOMER_DEMOGRAPHICS: + case Table::TBL_DATE_DIM: + case Table::TBL_HOUSEHOLD_DEMOGRAPHICS: + case Table::TBL_INCOME_BAND: + case Table::TBL_INVENTORY: + case Table::TBL_ITEM: + case Table::TBL_PROMOTION: + case Table::TBL_REASON: + case Table::TBL_SHIP_MODE: + case Table::TBL_STORE: + case Table::TBL_TIME_DIM: + case Table::TBL_WAREHOUSE: + case Table::TBL_WEB_PAGE: + case Table::TBL_WEB_SITE: + return genTpcdsTableData( + table, pool, maxRows, offset, scaleFactor, parallel, child); + default: + VELOX_UNREACHABLE(); + } +} +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.h new file mode 100644 index 0000000000000..a2df6a340e213 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/TpcdsGen.h @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/memory/Memory.h" + +namespace facebook::velox { +class RowVector; +using RowVectorPtr = std::shared_ptr; +class RowType; +using RowTypePtr = std::shared_ptr; +} // namespace facebook::velox + +namespace facebook::presto::connector::tpcds { + +/// This file uses TPC-DS DSDGEN to generate data encoded using Velox Vectors. + +enum class Table : uint8_t { + TBL_CALL_CENTER, + TBL_CATALOG_PAGE, + TBL_CATALOG_RETURNS, + TBL_CATALOG_SALES, + TBL_CUSTOMER, + TBL_CUSTOMER_ADDRESS, + TBL_CUSTOMER_DEMOGRAPHICS, + TBL_DATE_DIM, + TBL_HOUSEHOLD_DEMOGRAPHICS, + TBL_INCOME_BAND, + TBL_INVENTORY, + TBL_ITEM, + TBL_PROMOTION, + TBL_REASON, + TBL_SHIP_MODE, + TBL_STORE, + TBL_STORE_RETURNS, + TBL_STORE_SALES, + TBL_TIME_DIM, + TBL_WAREHOUSE, + TBL_WEB_PAGE, + TBL_WEB_RETURNS, + TBL_WEB_SALES, + TBL_WEB_SITE +}; + +static const auto tables = { + tpcds::Table::TBL_CALL_CENTER, + tpcds::Table::TBL_CATALOG_PAGE, + tpcds::Table::TBL_CATALOG_RETURNS, + tpcds::Table::TBL_CATALOG_SALES, + tpcds::Table::TBL_CUSTOMER, + tpcds::Table::TBL_CUSTOMER_ADDRESS, + tpcds::Table::TBL_CUSTOMER_DEMOGRAPHICS, + tpcds::Table::TBL_DATE_DIM, + tpcds::Table::TBL_HOUSEHOLD_DEMOGRAPHICS, + tpcds::Table::TBL_INCOME_BAND, + tpcds::Table::TBL_INVENTORY, + tpcds::Table::TBL_ITEM, + tpcds::Table::TBL_PROMOTION, + tpcds::Table::TBL_REASON, + tpcds::Table::TBL_SHIP_MODE, + tpcds::Table::TBL_STORE, + tpcds::Table::TBL_STORE_RETURNS, + tpcds::Table::TBL_STORE_SALES, + tpcds::Table::TBL_TIME_DIM, + tpcds::Table::TBL_WAREHOUSE, + tpcds::Table::TBL_WEB_PAGE, + tpcds::Table::TBL_WEB_RETURNS, + tpcds::Table::TBL_WEB_SALES, + tpcds::Table::TBL_WEB_SITE}; + +/// Returns table name as a string. +std::string toTableName(Table table); + +/// Returns the schema (RowType) for a particular TPC-DS table. +const velox::RowTypePtr getTableSchema(Table table); + +Table fromTableName(const std::string_view& tableName); + +velox::RowVectorPtr genTpcdsData( + Table table, + size_t maxRows, + size_t offset, + velox::memory::MemoryPool* pool, + double scaleFactor, + int32_t parallel, + int32_t child); +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.cpp b/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.cpp new file mode 100644 index 0000000000000..72edd76819806 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.cpp @@ -0,0 +1,158 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "append_info-c.h" + +#include +#include "presto_cpp/external/dsdgen/include/dsdgen-c/append_info.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/config.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/date.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/nulls.h" +#include "presto_cpp/external/dsdgen/include/dsdgen-c/porting.h" +#include "velox/vector/FlatVector.h" + +using namespace facebook::velox; +using namespace facebook::presto::connector::tpcds; +append_info* append_info_get(void* info_list, int table_id) { + auto& append_vector = + *((std::vector>*)info_list); + return (append_info*)append_vector[table_id].get(); +} + +bool facebook::presto::connector::tpcds::TpcdsTableDef::IsNull(int32_t column) { + return nullCheck(column, *dsdGenContext); +} + +void append_row_start(append_info info) { + auto append_info = (TpcdsTableDef*)info; +} + +void append_row_end(append_info info) { + auto append_info = (TpcdsTableDef*)info; + append_info->colIndex %= append_info->children.size(); + append_info->rowIndex++; +} + +void append_varchar( + int32_t column, + append_info info, + const char* value, + bool fillEmptyStringAsNull) { + auto append_info = (TpcdsTableDef*)info; + if (((append_info->IsNull(column)) || (!value) || (*value == '\0')) && + (fillEmptyStringAsNull)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex] + ->asFlatVector() + ->set(append_info->rowIndex, value); + } + append_info->colIndex++; +} + +void append_varchar( + int32_t column, + append_info info, + std::string value, + bool fillEmptyStringAsNull) { + append_varchar(column, info, value.data(), fillEmptyStringAsNull); +} + +void append_key(int32_t column, append_info info, int64_t value) { + auto append_info = (TpcdsTableDef*)info; + if (append_info->IsNull(column) || value < 0) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, value); + } + append_info->colIndex++; +} + +void append_integer(int32_t column, append_info info, int32_t value) { + auto append_info = (TpcdsTableDef*)info; + if (append_info->IsNull(column)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, value); + } + append_info->colIndex++; +} + +void append_boolean(int32_t column, append_info info, int32_t value) { + auto append_info = (TpcdsTableDef*)info; + if (append_info->IsNull(column)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, value != 0); + } + append_info->colIndex++; +} + +// value is a Julian date +// FIXME: direct int conversion, offsets should be constant +void append_date(int32_t column, append_info info, int64_t value) { + auto append_info = (TpcdsTableDef*)info; + if (append_info->IsNull(column) || value < 0) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + date_t dTemp; + jtodt(&dTemp, (int)value); + auto stringDate = + fmt::format("{}-{}-{}", dTemp.year, dTemp.month, dTemp.day); + auto date = DATE()->toDays(stringDate); + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, date); + } + append_info->colIndex++; +} + +void append_decimal(int32_t column, append_info info, decimal_t* val) { + auto append_info = (TpcdsTableDef*)info; + if (append_info->IsNull(column)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + auto type = append_info->children[append_info->colIndex]->type(); + if (type->isShortDecimal()) { + append_info->children[append_info->colIndex] + ->asFlatVector() + ->set(append_info->rowIndex, val->number); + } else { + append_info->children[append_info->colIndex] + ->asFlatVector() + ->set(append_info->rowIndex, val->number); + } + } + append_info->colIndex++; +} + +void append_integer_decimal(int32_t column, append_info info, int32_t value) { + auto append_info = (TpcdsTableDef*)info; + if (append_info->IsNull(column)) { + append_info->children[append_info->colIndex]->setNull( + append_info->rowIndex, true); + } else { + append_info->children[append_info->colIndex]->asFlatVector()->set( + append_info->rowIndex, (int64_t)value * 100); + } + append_info->colIndex++; +} diff --git a/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.h b/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.h new file mode 100644 index 0000000000000..a02e292c6a859 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/tpcds/utils/append_info-c.h @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "presto_cpp/external/dsdgen/include/dsdgen-c/dist.h" + +namespace facebook::velox { +class BaseVector; +using VectorPtr = std::shared_ptr; +} // namespace facebook::velox + +namespace facebook::presto::connector::tpcds { + +struct TpcdsTableDef { + const char* name; + int fl_small; + int fl_child; + int first_column; + int colIndex = 0; + int rowIndex = 0; + DSDGenContext* dsdGenContext; + std::vector children; + bool IsNull(int32_t column); +}; +} // namespace facebook::presto::connector::tpcds diff --git a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt index 9635db335915b..2d8e2ea9b0df5 100644 --- a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt @@ -38,6 +38,7 @@ target_link_libraries( $ $ velox_hive_connector + presto_tpcds_connector velox_tpch_connector velox_presto_serializer velox_functions_prestosql diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp index faa4f93eaf67a..91d7f30bf5ec8 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp @@ -14,6 +14,8 @@ #include "presto_cpp/main/types/PrestoToVeloxConnector.h" #include +#include "presto_cpp/main/connectors/tpcds/TpcdsConnector.h" +#include "presto_cpp/main/connectors/tpcds/TpcdsConnectorSplit.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/HiveDataSink.h" @@ -23,6 +25,7 @@ #include "velox/connectors/tpch/TpchConnector.h" #include "velox/connectors/tpch/TpchConnectorSplit.h" +using namespace facebook::velox; namespace facebook::presto { namespace { @@ -106,15 +109,16 @@ TypePtr stringToType( return typeParser.parse(typeString); } -connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( +velox::connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( protocol::ColumnType type) { switch (type) { case protocol::ColumnType::PARTITION_KEY: - return connector::hive::HiveColumnHandle::ColumnType::kPartitionKey; + return velox::connector::hive::HiveColumnHandle::ColumnType:: + kPartitionKey; case protocol::ColumnType::REGULAR: - return connector::hive::HiveColumnHandle::ColumnType::kRegular; + return velox::connector::hive::HiveColumnHandle::ColumnType::kRegular; case protocol::ColumnType::SYNTHESIZED: - return connector::hive::HiveColumnHandle::ColumnType::kSynthesized; + return velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized; default: VELOX_UNSUPPORTED( "Unsupported Hive column type: {}.", toJsonString(type)); @@ -783,7 +787,7 @@ std::unique_ptr toFilter( VELOX_UNSUPPORTED("Unsupported filter found."); } -std::unique_ptr toHiveTableHandle( +std::unique_ptr toHiveTableHandle( const protocol::TupleDomain& domainPredicate, const std::shared_ptr& remainingPredicate, bool isPushdownFilterEnabled, @@ -793,7 +797,7 @@ std::unique_ptr toHiveTableHandle( const protocol::Map& tableParameters, const VeloxExprConverter& exprConverter, const TypeParser& typeParser) { - connector::hive::SubfieldFilters subfieldFilters; + velox::connector::hive::SubfieldFilters subfieldFilters; auto domains = domainPredicate.domains; for (const auto& domain : *domains) { auto filter = domain.second; @@ -833,7 +837,7 @@ std::unique_ptr toHiveTableHandle( } if (tableParameters.empty()) { - return std::make_unique( + return std::make_unique( tableHandle.connectorId, tableName, isPushdownFilterEnabled, @@ -848,7 +852,7 @@ std::unique_ptr toHiveTableHandle( finalTableParameters[key] = value; } - return std::make_unique( + return std::make_unique( tableHandle.connectorId, tableName, isPushdownFilterEnabled, @@ -858,7 +862,7 @@ std::unique_ptr toHiveTableHandle( finalTableParameters); } -connector::hive::LocationHandle::TableType toTableType( +velox::connector::hive::LocationHandle::TableType toTableType( protocol::TableType tableType) { switch (tableType) { case protocol::TableType::NEW: @@ -866,17 +870,17 @@ connector::hive::LocationHandle::TableType toTableType( // So they can be treated as New. They do not require Append or Overwrite // semantics as applicable for regular tables. case protocol::TableType::TEMPORARY: - return connector::hive::LocationHandle::TableType::kNew; + return velox::connector::hive::LocationHandle::TableType::kNew; case protocol::TableType::EXISTING: - return connector::hive::LocationHandle::TableType::kExisting; + return velox::connector::hive::LocationHandle::TableType::kExisting; default: VELOX_UNSUPPORTED("Unsupported table type: {}.", toJsonString(tableType)); } } -std::shared_ptr toLocationHandle( +std::shared_ptr toLocationHandle( const protocol::LocationHandle& locationHandle) { - return std::make_shared( + return std::make_shared( locationHandle.targetPath, locationHandle.writePath, toTableType(locationHandle.tableType)); @@ -975,8 +979,8 @@ toHiveSortingColumns(const protocol::List& sortedBy) { std::shared_ptr toHiveBucketProperty( - const std::vector>& - inputColumns, + const std::vector>& inputColumns, const std::shared_ptr& bucketProperty, const TypeParser& typeParser) { if (bucketProperty == nullptr) { @@ -1248,7 +1252,7 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle( const auto table = hiveInsertTableHandle->pageSinkMetadata.table; VELOX_USER_CHECK_NOT_NULL(table, "Table must not be null for insert query"); - return std::make_unique( + return std::make_unique( inputColumns, toLocationHandle(hiveInsertTableHandle->locationHandle), toFileFormat(hiveInsertTableHandle->tableStorageFormat, "TableWrite"), @@ -1261,20 +1265,20 @@ HivePrestoToVeloxConnector::toVeloxInsertTableHandle( table->storage.serdeParameters.end())); } -std::vector> +std::vector> HivePrestoToVeloxConnector::toHiveColumns( const protocol::List& inputColumns, const TypeParser& typeParser, bool& hasPartitionColumn) const { hasPartitionColumn = false; - std::vector> + std::vector> hiveColumns; hiveColumns.reserve(inputColumns.size()); for (const auto& columnHandle : inputColumns) { hasPartitionColumn |= columnHandle.columnType == protocol::ColumnType::PARTITION_KEY; hiveColumns.emplace_back( - std::dynamic_pointer_cast( + std::dynamic_pointer_cast( std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser)))); } return hiveColumns; @@ -1299,7 +1303,7 @@ HivePrestoToVeloxConnector::createVeloxPartitionFunctionSpec( "Unsupported Hive bucket function type: {}", toJsonString(hivePartitioningHandle->bucketFunctionType)); effectivelyGather = hivePartitioningHandle->bucketCount == 1; - return std::make_unique( + return std::make_unique( hivePartitioningHandle->bucketCount, bucketToPartition, channels, @@ -1360,7 +1364,7 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( {"$data_sequence_number", std::to_string(icebergSplit->dataSequenceNumber)}); - return std::make_unique( + return std::make_unique( catalogId, icebergSplit->path, toVeloxFileFormat(icebergSplit->fileFormat), @@ -1385,7 +1389,7 @@ IcebergPrestoToVeloxConnector::toVeloxColumnHandle( // TODO(imjalpreet): Modify 'hiveType' argument of the 'HiveColumnHandle' // constructor similar to how Hive Connector is handling for bucketing velox::type::fbhive::HiveTypeParser hiveTypeParser; - return std::make_unique( + return std::make_unique( icebergColumn->columnIdentity.name, toHiveColumnType(icebergColumn->columnType), stringToType(icebergColumn->type, typeParser), @@ -1470,7 +1474,7 @@ TpchPrestoToVeloxConnector::toVeloxSplit( auto tpchSplit = dynamic_cast(connectorSplit); VELOX_CHECK_NOT_NULL( tpchSplit, "Unexpected split type {}", connectorSplit->_type); - return std::make_unique( + return std::make_unique( catalogId, tpchSplit->totalParts, tpchSplit->partNumber); } @@ -1481,7 +1485,7 @@ TpchPrestoToVeloxConnector::toVeloxColumnHandle( auto tpchColumn = dynamic_cast(column); VELOX_CHECK_NOT_NULL( tpchColumn, "Unexpected column handle type {}", column->_type); - return std::make_unique( + return std::make_unique( tpchColumn->columnName); } @@ -1500,7 +1504,7 @@ TpchPrestoToVeloxConnector::toVeloxTableHandle( tpchLayout, "Unexpected layout type {}", tableHandle.connectorTableLayout->_type); - return std::make_unique( + return std::make_unique( tableHandle.connectorId, tpch::fromTableName(tpchLayout->table.tableName), tpchLayout->table.scaleFactor); @@ -1510,4 +1514,52 @@ std::unique_ptr TpchPrestoToVeloxConnector::createConnectorProtocol() const { return std::make_unique(); } + +std::unique_ptr +TpcdsPrestoToVeloxConnector::toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* const connectorSplit) const { + auto tpcdsSplit = dynamic_cast(connectorSplit); + VELOX_CHECK_NOT_NULL( + tpcdsSplit, "Unexpected split type {}", connectorSplit->_type); + return std::make_unique( + catalogId, tpcdsSplit->totalParts, tpcdsSplit->partNumber); +} + +std::unique_ptr +TpcdsPrestoToVeloxConnector::toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const { + auto tpcdsColumn = dynamic_cast(column); + VELOX_CHECK_NOT_NULL( + tpcdsColumn, "Unexpected column handle type {}", column->_type); + return std::make_unique( + tpcdsColumn->columnName); +} + +std::unique_ptr +TpcdsPrestoToVeloxConnector::toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) const { + auto tpcdsLayout = + std::dynamic_pointer_cast( + tableHandle.connectorTableLayout); + VELOX_CHECK_NOT_NULL( + tpcdsLayout, + "Unexpected layout type {}", + tableHandle.connectorTableLayout->_type); + return std::make_unique( + tableHandle.connectorId, + presto::connector::tpcds::fromTableName(tpcdsLayout->table.tableName), + tpcdsLayout->table.scaleFactor); +} + +std::unique_ptr +TpcdsPrestoToVeloxConnector::createConnectorProtocol() const { + return std::make_unique(); +} } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h index 754aaeddbef05..f00348805b456 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h @@ -210,4 +210,30 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector { std::unique_ptr createConnectorProtocol() const final; }; + +class TpcdsPrestoToVeloxConnector final : public PrestoToVeloxConnector { + public: + explicit TpcdsPrestoToVeloxConnector(std::string connectorId) + : PrestoToVeloxConnector(std::move(connectorId)) {} + + std::unique_ptr toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* connectorSplit) const final; + + std::unique_ptr toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const final; + + std::unique_ptr toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) + const final; + + std::unique_ptr createConnectorProtocol() + const final; +}; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt index 4cd79ed1bf5bb..f10efab107253 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt @@ -20,6 +20,7 @@ target_link_libraries( GTest::gtest_main presto_operators presto_protocol + presto_tpcds_connector velox_hive_connector velox_tpch_connector velox_exec @@ -48,6 +49,7 @@ target_link_libraries( $ $ presto_operators + presto_tpcds_connector velox_core velox_dwio_common_exception velox_encode @@ -83,6 +85,7 @@ target_link_libraries( presto_to_velox_connector_test presto_protocol presto_operators + presto_tpcds_connector presto_type_converter presto_types velox_hive_connector diff --git a/presto-native-execution/presto_cpp/presto_protocol/ConnectorProtocol.h b/presto-native-execution/presto_cpp/presto_protocol/ConnectorProtocol.h index b91131791bb0c..9108aa1577b2c 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/ConnectorProtocol.h +++ b/presto-native-execution/presto_cpp/presto_protocol/ConnectorProtocol.h @@ -272,4 +272,15 @@ using SystemConnectorProtocol = ConnectorProtocolTemplate< SystemTransactionHandle, NotImplemented>; +using TpcdsConnectorProtocol = ConnectorProtocolTemplate< + TpcdsTableHandle, + TpcdsTableLayoutHandle, + TpcdsColumnHandle, + NotImplemented, + NotImplemented, + TpcdsSplit, + TpcdsPartitioningHandle, + TpcdsTransactionHandle, + NotImplemented>; + } // namespace facebook::presto::protocol diff --git a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp index 4fa8486e6f92f..2fd0e140c44f1 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.cpp @@ -991,6 +991,36 @@ void from_json(const json& j, std::shared_ptr& p) { * limitations under the License. */ +// TpcdsTransactionHandle is special since +// the corresponding class in Java is an enum. + +namespace facebook::presto::protocol { + +void to_json(json& j, const TpcdsTransactionHandle& p) { + j = json::array(); + j.push_back(p._type); + j.push_back(p.instance); +} + +void from_json(const json& j, TpcdsTransactionHandle& p) { + j[0].get_to(p._type); + j[1].get_to(p.instance); +} +} // namespace facebook::presto::protocol +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + // TpchTransactionHandle is special since // the corresponding class in Java is an enum. @@ -1022,6 +1052,7 @@ void from_json(const json& j, TpchTransactionHandle& p) { */ // dependency TpchTransactionHandle +// dependency TpcdsTransactionHandle namespace facebook::presto::protocol { void to_json(json& j, const std::shared_ptr& p) { @@ -13845,6 +13876,178 @@ void from_json(const json& j, TopNRowNumberNode& p) { } } // namespace facebook::presto::protocol namespace facebook::presto::protocol { +TpcdsColumnHandle::TpcdsColumnHandle() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsColumnHandle& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, + "columnName", + p.columnName, + "TpcdsColumnHandle", + "String", + "columnName"); + to_json_key(j, "type", p.type, "TpcdsColumnHandle", "Type", "type"); +} + +void from_json(const json& j, TpcdsColumnHandle& p) { + p._type = j["@type"]; + from_json_key( + j, + "columnName", + p.columnName, + "TpcdsColumnHandle", + "String", + "columnName"); + from_json_key(j, "type", p.type, "TpcdsColumnHandle", "Type", "type"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +TpcdsPartitioningHandle::TpcdsPartitioningHandle() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsPartitioningHandle& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, "table", p.table, "TpcdsPartitioningHandle", "String", "table"); + to_json_key( + j, + "totalRows", + p.totalRows, + "TpcdsPartitioningHandle", + "int64_t", + "totalRows"); +} + +void from_json(const json& j, TpcdsPartitioningHandle& p) { + p._type = j["@type"]; + from_json_key( + j, "table", p.table, "TpcdsPartitioningHandle", "String", "table"); + from_json_key( + j, + "totalRows", + p.totalRows, + "TpcdsPartitioningHandle", + "int64_t", + "totalRows"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +TpcdsTableHandle::TpcdsTableHandle() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsTableHandle& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, "tableName", p.tableName, "TpcdsTableHandle", "String", "tableName"); + to_json_key( + j, + "scaleFactor", + p.scaleFactor, + "TpcdsTableHandle", + "double", + "scaleFactor"); +} + +void from_json(const json& j, TpcdsTableHandle& p) { + p._type = j["@type"]; + from_json_key( + j, "tableName", p.tableName, "TpcdsTableHandle", "String", "tableName"); + from_json_key( + j, + "scaleFactor", + p.scaleFactor, + "TpcdsTableHandle", + "double", + "scaleFactor"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +TpcdsSplit::TpcdsSplit() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsSplit& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, + "tableHandle", + p.tableHandle, + "TpcdsSplit", + "TpcdsTableHandle", + "tableHandle"); + to_json_key(j, "partNumber", p.partNumber, "TpcdsSplit", "int", "partNumber"); + to_json_key(j, "totalParts", p.totalParts, "TpcdsSplit", "int", "totalParts"); + to_json_key( + j, + "addresses", + p.addresses, + "TpcdsSplit", + "List", + "addresses"); + to_json_key(j, "noSexism", p.noSexism, "TpcdsSplit", "bool", "noSexism"); +} + +void from_json(const json& j, TpcdsSplit& p) { + p._type = j["@type"]; + from_json_key( + j, + "tableHandle", + p.tableHandle, + "TpcdsSplit", + "TpcdsTableHandle", + "tableHandle"); + from_json_key( + j, "partNumber", p.partNumber, "TpcdsSplit", "int", "partNumber"); + from_json_key( + j, "totalParts", p.totalParts, "TpcdsSplit", "int", "totalParts"); + from_json_key( + j, + "addresses", + p.addresses, + "TpcdsSplit", + "List", + "addresses"); + from_json_key(j, "noSexism", p.noSexism, "TpcdsSplit", "bool", "noSexism"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +TpcdsTableLayoutHandle::TpcdsTableLayoutHandle() noexcept { + _type = "tpcds"; +} + +void to_json(json& j, const TpcdsTableLayoutHandle& p) { + j = json::object(); + j["@type"] = "tpcds"; + to_json_key( + j, + "table", + p.table, + "TpcdsTableLayoutHandle", + "TpcdsTableHandle", + "table"); +} + +void from_json(const json& j, TpcdsTableLayoutHandle& p) { + p._type = j["@type"]; + from_json_key( + j, + "table", + p.table, + "TpcdsTableLayoutHandle", + "TpcdsTableHandle", + "table"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { TpchColumnHandle::TpchColumnHandle() noexcept { _type = "tpch"; } diff --git a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.h b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.h index 1bcc6a7a0ae1e..481bf26ca29a9 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.h +++ b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.h @@ -477,6 +477,34 @@ struct AllOrNoneValueSet : public ValueSet { }; void to_json(json& j, const AllOrNoneValueSet& p); void from_json(const json& j, AllOrNoneValueSet& p); +} // namespace facebook::presto::protocol +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TpcdsTransactionHandle is special since +// the corresponding class in Java is an enum. + +namespace facebook::presto::protocol { + +struct TpcdsTransactionHandle : public ConnectorTransactionHandle { + String instance = {}; +}; + +void to_json(json& j, const TpcdsTransactionHandle& p); + +void from_json(const json& j, TpcdsTransactionHandle& p); + } // namespace facebook::presto::protocol /* * Licensed under the Apache License, Version 2.0 (the "License"); @@ -3159,6 +3187,58 @@ struct TopNRowNumberNode : public PlanNode { void to_json(json& j, const TopNRowNumberNode& p); void from_json(const json& j, TopNRowNumberNode& p); } // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsColumnHandle : public ColumnHandle { + String columnName = {}; + Type type = {}; + + TpcdsColumnHandle() noexcept; +}; +void to_json(json& j, const TpcdsColumnHandle& p); +void from_json(const json& j, TpcdsColumnHandle& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsPartitioningHandle : public ConnectorPartitioningHandle { + String table = {}; + int64_t totalRows = {}; + + TpcdsPartitioningHandle() noexcept; +}; +void to_json(json& j, const TpcdsPartitioningHandle& p); +void from_json(const json& j, TpcdsPartitioningHandle& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsTableHandle : public ConnectorTableHandle { + String tableName = {}; + double scaleFactor = {}; + + TpcdsTableHandle() noexcept; +}; +void to_json(json& j, const TpcdsTableHandle& p); +void from_json(const json& j, TpcdsTableHandle& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsSplit : public ConnectorSplit { + TpcdsTableHandle tableHandle = {}; + int partNumber = {}; + int totalParts = {}; + List addresses = {}; + bool noSexism = {}; + + TpcdsSplit() noexcept; +}; +void to_json(json& j, const TpcdsSplit& p); +void from_json(const json& j, TpcdsSplit& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct TpcdsTableLayoutHandle : public ConnectorTableLayoutHandle { + TpcdsTableHandle table = {}; + + TpcdsTableLayoutHandle() noexcept; +}; +void to_json(json& j, const TpcdsTableLayoutHandle& p); +void from_json(const json& j, TpcdsTableLayoutHandle& p); +} // namespace facebook::presto::protocol /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml index ca4e30ae411e3..d574b72bb6d4a 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml +++ b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml @@ -53,6 +53,7 @@ AbstractClasses: - { name: IcebergColumnHandle, key: hive-iceberg } - { name: TpchColumnHandle, key: tpch } - { name: SystemColumnHandle, key: $system@system } + - { name: TpcdsColumnHandle, key: tpcds } ConnectorPartitioningHandle: super: JsonEncodedSubclass @@ -60,6 +61,7 @@ AbstractClasses: - { name: SystemPartitioningHandle, key: $remote } - { name: HivePartitioningHandle, key: hive} - { name: TpchPartitioningHandle, key: tpch} + - { name: TpcdsPartitioningHandle, key: tpcds} ConnectorTableHandle: super: JsonEncodedSubclass @@ -68,6 +70,7 @@ AbstractClasses: - { name: IcebergTableHandle, key: hive-iceberg } - { name: TpchTableHandle, key: tpch } - { name: SystemTableHandle, key: $system@system } + - { name: TpcdsTableHandle, key: tpcds } ConnectorOutputTableHandle: super: JsonEncodedSubclass @@ -87,6 +90,8 @@ AbstractClasses: - { name: HiveTransactionHandle, key: hive } - { name: RemoteTransactionHandle, key: $remote } - { name: SystemTransactionHandle, key: $system@system } + - { name: TpchTransactionHandle, key: tpch } + - { name: TpcdsTransactionHandle, key: tpcds } ConnectorTableLayoutHandle: super: JsonEncodedSubclass @@ -95,6 +100,7 @@ AbstractClasses: - { name: IcebergTableLayoutHandle, key: hive-iceberg } - { name: TpchTableLayoutHandle, key: tpch } - { name: SystemTableLayoutHandle, key: $system@system } + - { name: TpcdsTableLayoutHandle, key: tpcds } ConnectorMetadataUpdateHandle: super: JsonEncodedSubclass @@ -107,6 +113,7 @@ AbstractClasses: - { name: HiveSplit, key: hive } - { name: IcebergSplit, key: hive-iceberg } - { name: TpchSplit, key: tpch } + - { name: TpcdsSplit, key: tpcds } - { name: RemoteSplit, key: $remote } - { name: EmptySplit, key: $empty } - { name: SystemSplit, key: $system@system } @@ -272,6 +279,11 @@ JavaClasses: - presto-tpch/src/main/java/com/facebook/presto/tpch/TpchTableLayoutHandle.java - presto-tpch/src/main/java/com/facebook/presto/tpch/TpchColumnHandle.java - presto-tpch/src/main/java/com/facebook/presto/tpch/TpchPartitioningHandle.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplit.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableHandle.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsTableLayoutHandle.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsColumnHandle.java + - presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsPartitioningHandle.java - presto-spi/src/main/java/com/facebook/presto/spi/plan/LimitNode.java - presto-spi/src/main/java/com/facebook/presto/spi/function/LongVariableConstraint.java - presto-common/src/main/java/com/facebook/presto/common/predicate/Marker.java diff --git a/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorTransactionHandle.cpp.inc b/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorTransactionHandle.cpp.inc index 8ec2a94e84bd9..1b89a65615b1a 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorTransactionHandle.cpp.inc +++ b/presto-native-execution/presto_cpp/presto_protocol/special/ConnectorTransactionHandle.cpp.inc @@ -13,6 +13,7 @@ */ // dependency TpchTransactionHandle +// dependency TpcdsTransactionHandle namespace facebook::presto::protocol { void to_json(json& j, const std::shared_ptr& p) { diff --git a/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.cpp.inc b/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.cpp.inc new file mode 100644 index 0000000000000..f741669d72dc7 --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.cpp.inc @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TpcdsTransactionHandle is special since +// the corresponding class in Java is an enum. + +namespace facebook::presto::protocol { + +void to_json(json& j, const TpcdsTransactionHandle& p) { + j = json::array(); + j.push_back(p._type); + j.push_back(p.instance); +} + +void from_json(const json& j, TpcdsTransactionHandle& p) { + j[0].get_to(p._type); + j[1].get_to(p.instance); +} +} // namespace facebook::presto::protocol diff --git a/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.hpp.inc b/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.hpp.inc new file mode 100644 index 0000000000000..d665925648813 --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/special/TpcdsTransactionHandle.hpp.inc @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TpcdsTransactionHandle is special since +// the corresponding class in Java is an enum. + +namespace facebook::presto::protocol { + +struct TpcdsTransactionHandle : public ConnectorTransactionHandle { + String instance = {}; +}; + +void to_json(json& j, const TpcdsTransactionHandle& p); + +void from_json(const json& j, TpcdsTransactionHandle& p); + +} // namespace facebook::presto::protocol diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeTpcdsConnectorQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeTpcdsConnectorQueries.java new file mode 100644 index 0000000000000..6979598e25454 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeTpcdsConnectorQueries.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.Session; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import org.testng.annotations.Test; + +public abstract class AbstractTestNativeTpcdsConnectorQueries + extends AbstractTestQueryFramework +{ + @Override + public Session getSession() + { + return Session.builder(super.getSession()).setCatalog("tpcds").setSchema("tiny").build(); + } + + @Test + public void testTpcdsTinyTablesRowCount() + { + Session session = getSession(); + assertQuery(session, "SELECT count(*) FROM catalog_returns"); + assertQuery(session, "SELECT count(*) FROM catalog_sales"); + assertQuery(session, "SELECT count(*) FROM store_sales"); + assertQuery(session, "SELECT count(*) FROM store_returns"); + assertQuery(session, "SELECT count(*) FROM web_sales"); + assertQuery(session, "SELECT count(*) FROM web_returns"); + assertQuery(session, "SELECT count(*) FROM inventory"); + assertQuery(session, "SELECT count(*) FROM item"); + assertQuery(session, "SELECT count(*) FROM customer_address"); + assertQuery(session, "SELECT count(*) FROM customer_demographics"); + assertQuery(session, "SELECT count(*) FROM call_center"); + assertQuery(session, "SELECT count(*) FROM customer"); + assertQuery(session, "SELECT count(*) FROM web_site"); + assertQuery(session, "SELECT count(*) FROM web_page"); + assertQuery(session, "SELECT count(*) FROM promotion"); + assertQuery(session, "SELECT count(*) FROM reason"); + assertQuery(session, "SELECT count(*) FROM store"); + assertQuery(session, "SELECT count(*) FROM income_band"); + assertQuery(session, "SELECT count(*) FROM household_demographics"); + assertQuery(session, "SELECT count(*) FROM warehouse"); + assertQuery(session, "SELECT count(*) FROM catalog_page"); + assertQuery(session, "SELECT count(*) FROM date_dim"); + assertQuery(session, "SELECT count(*) FROM time_dim"); + assertQuery(session, "SELECT count(*) FROM ship_mode"); + } + + @Test + public void testTpcdsBasicQueries() + { + Session session = getSession(); + assertQuery(session, "SELECT cc_call_center_sk, cc_name, cc_manager, cc_mkt_id, trim(cast(cc_mkt_class as varchar)) FROM call_center"); + assertQuery(session, "SELECT ss_store_sk, SUM(ss_net_paid) AS total_sales " + + "FROM store_sales GROUP BY ss_store_sk ORDER BY total_sales DESC LIMIT 10"); + assertQuery(session, "SELECT sr_item_sk, SUM(sr_return_quantity) AS total_returns " + + "FROM store_returns WHERE sr_item_sk = 12345 GROUP BY sr_item_sk"); + assertQuery(session, "SELECT ws_order_number, SUM(ws_net_paid) AS total_paid FROM web_sales " + + "WHERE ws_sold_date_sk BETWEEN 2451180 AND 2451545 GROUP BY ws_order_number"); + assertQuery(session, "SELECT inv_item_sk, inv_quantity_on_hand FROM inventory WHERE inv_quantity_on_hand > 1000 " + + "ORDER BY inv_quantity_on_hand DESC"); + assertQuery(session, "SELECT SUM(ss_net_paid) AS total_revenue FROM store_sales, promotion " + + "WHERE p_promo_sk = 100 GROUP BY p_promo_sk"); + assertQuery(session, "SELECT trim(cast(c.c_customer_id as varchar)) FROM customer c " + + "JOIN customer_demographics cd ON c.c_customer_sk = cd.cd_demo_sk WHERE cd_purchase_estimate > 5000"); + assertQuery(session, "SELECT trim(cast(cd_gender as varchar)), AVG(cd_purchase_estimate) AS avg_purchase_estimate FROM customer_demographics" + + " GROUP BY cd_gender ORDER BY avg_purchase_estimate DESC"); + + // No row passes the filter. + assertQuery(session, + "SELECT s_store_sk, s_store_id, s_number_employees FROM store WHERE s_number_employees > 1000"); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index 69bb2433b3f18..f0953b9a4e3a2 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -478,6 +478,10 @@ public static Optional> getExternalWorkerLaunc Files.write(catalogDirectoryPath.resolve("tpchstandard.properties"), format("connector.name=tpch%n").getBytes()); + // Add a tpcds catalog. + Files.write(catalogDirectoryPath.resolve("tpcds.properties"), + format("connector.name=tpcds%n").getBytes()); + // Disable stack trace capturing as some queries (using TRY) generate a lot of exceptions. return new ProcessBuilder(prestoServerPath, "--logtostderr=1", "--v=1") .directory(tempDirectoryPath.toFile()) diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpcdsConnectorQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpcdsConnectorQueries.java new file mode 100644 index 0000000000000..50c3cc3a10037 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpcdsConnectorQueries.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +public class TestPrestoNativeTpcdsConnectorQueries + extends AbstractTestNativeTpcdsConnectorQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner(true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +} diff --git a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplitManager.java b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplitManager.java index 77cd902330026..10ae57007cb6a 100644 --- a/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplitManager.java +++ b/presto-tpcds/src/main/java/com/facebook/presto/tpcds/TpcdsSplitManager.java @@ -24,10 +24,13 @@ import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; +import java.util.HashSet; import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableSet; import static java.util.Objects.requireNonNull; public class TpcdsSplitManager @@ -62,12 +65,24 @@ public ConnectorSplitSource getSplits( int totalParts = nodes.size() * splitsPerNode; int partNumber = 0; - // Split the data using split and skew by the number of nodes available. + // For larger tables, split the data using split and skew by the number of nodes available. + // The TPCDS connector in presto native uses dsdgen-c for data generation. For certain smaller tables, + // the data cannot be generated in parallel. For these cases, a single split should be processed by + // only one of the worker nodes. + Set smallTables = unmodifiableSet(new HashSet<>(asList("call_center", "item", "store", "web_page", "web_site"))); ImmutableList.Builder splits = ImmutableList.builder(); - for (Node node : nodes) { - for (int i = 0; i < splitsPerNode; i++) { - splits.add(new TpcdsSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()), noSexism)); - partNumber++; + if (smallTables.contains(tableHandle.getTableName())) { + Node node = nodes.stream() + .findFirst() + .orElse(null); + splits.add(new TpcdsSplit(tableHandle, 0, 1, ImmutableList.of(node.getHostAndPort()), noSexism)); + } + else { + for (Node node : nodes) { + for (int i = 0; i < splitsPerNode; i++) { + splits.add(new TpcdsSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()), noSexism)); + partNumber++; + } } } return new FixedSplitSource(splits.build());