From 68dd6ffd06cdc88872d49767529db2c286aad2d6 Mon Sep 17 00:00:00 2001 From: Pedro Eugenio Rocha Pedreira Date: Fri, 13 Dec 2024 13:27:25 -0800 Subject: [PATCH] Capture KeepAlive instead of Executor in WriterOptions (#115) Summary: Pull Request resolved: https://github.com/facebookincubator/nimble/pull/115 folly::Executor::KeepAlive<> is the recommended way of holding references to Executors, as they ensure the executor is kept alive until the KeepAlive object is destroyed. Because of this, some folly APIs can only return KeepAlive (and not shared_ptr), such as Global pools. These APIs cannot use WriterOption is it takes a shared_ptr Reviewed By: xiaoxmeng, HuamengJiang Differential Revision: D66741079 fbshipit-source-id: 715f25bd91f17abbd7006aff7d8e203deef39569 --- dwio/nimble/velox/VeloxWriterOptions.h | 24 ++++++++++++++++---- dwio/nimble/velox/tests/VeloxReaderTests.cpp | 17 +++++++++----- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 2a1f202..4fbc842 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -126,11 +126,25 @@ struct VeloxWriterOptions { const velox::common::SpillConfig* spillConfig{nullptr}; - // If provided, internal encoding operations will happen in parallel using - // this executor. - std::shared_ptr encodingExecutor; - // If provided, internal ingestion operations will happen in parallel - std::shared_ptr writeExecutor; + // If provided, internal writing/encoding operations will happen in parallel + // using the specified executors. + // + // The KeepAlive wrappers ensures that the executor object will be kept alive + // (allocated), and that the pool will be open for receiving new tasks. A + // shared_ptr would only guarantee that the object is still allocated, but not + // necessarily open for new task (e.g. it could have been .join()'ed through a + // different reference). Because of that, many libraries only provide + // KeepAlive references to executors, not shared_ptr, so taking a KeepAlive + // also makes it more convenient to clients. + // + // As a result, if a KeepAlive is still being held, clients trying to destruct + // the last reference of a shared_ptr to that executor will block until all + // KeepAlive references are destructed. + // + // - encodingExecutor: execute stream encoding operations in parallel. + // - writeExecutor: execute FieldWriter::write() operations in parallel. + folly::Executor::KeepAlive<> encodingExecutor; + folly::Executor::KeepAlive<> writeExecutor; bool enableChunking = false; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 8b0cf02..8f67b59 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -2426,14 +2426,17 @@ TEST_F(VeloxReaderTests, FuzzSimple) { auto iterations = 20; auto batches = 20; std::mt19937 rng{seed}; + for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; nimble::VeloxWriterOptions writerOptions; + std::shared_ptr executor; + if (parallelismFactor > 0) { - auto executor = + executor = std::make_shared(parallelismFactor); - writerOptions.encodingExecutor = executor; - writerOptions.writeExecutor = executor; + writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor); + writerOptions.writeExecutor = folly::getKeepAliveToken(*executor); } for (auto i = 0; i < iterations; ++i) { @@ -2527,11 +2530,13 @@ TEST_F(VeloxReaderTests, FuzzComplex) { for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; + std::shared_ptr executor; + if (parallelismFactor > 0) { - auto executor = + executor = std::make_shared(parallelismFactor); - writerOptions.encodingExecutor = executor; - writerOptions.writeExecutor = executor; + writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor); + writerOptions.writeExecutor = folly::getKeepAliveToken(*executor); } for (auto i = 0; i < iterations; ++i) {