From d13e6e7fd5dd2b4d84a7dc5a04d701cd72473a0c Mon Sep 17 00:00:00 2001 From: Aidar Samerkhanov Date: Fri, 22 Dec 2023 17:15:26 +0300 Subject: [PATCH] YQL-17087: Add channel spilling to dq pipe communication (#612) * Add channel spilling to dq pipe communication * Add EvPoison handler to DqChannelStorageActor --- ydb/core/tx/datashard/datashard_kqp.cpp | 2 +- .../compute/dq_task_runner_exec_ctx.cpp | 6 +- .../actors/compute/dq_task_runner_exec_ctx.h | 2 +- .../dq/actors/spilling/channel_storage.cpp | 17 +- .../yql/dq/actors/spilling/channel_storage.h | 4 +- .../actors/spilling/channel_storage_actor.cpp | 277 +++++++++++++----- .../actors/spilling/channel_storage_actor.h | 22 +- ydb/library/yql/dq/runtime/dq_tasks_runner.h | 4 +- .../task_runner_actor/task_runner_actor.cpp | 124 +++++++- 9 files changed, 362 insertions(+), 96 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 2dc360b98aa5..9b3871ca82c3 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -1111,7 +1111,7 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext return {}; } - NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */) const override { + NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override { return {}; } }; diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp index 82ccd35b5844..9ce940a36aa5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp @@ -14,12 +14,12 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool wi } IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const { - return CreateChannelStorage(channelId, nullptr); + return CreateChannelStorage(channelId, nullptr, false); } -IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const { +IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const { if (WithSpilling_) { - return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem); + return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem, isConcurrent); } else { return nullptr; } diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h index 1c6272018769..0dedb3434bfd 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h @@ -12,7 +12,7 @@ class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase { TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp); IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override; - IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const override; + IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const override; private: const TTxId TxId_; diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp index 4b18d3766338..386813d2a2a0 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp @@ -22,13 +22,17 @@ namespace { class TDqChannelStorage : public IDqChannelStorage { public: - TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) { - SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); - TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor()); + TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem, bool isConcurrent) { + if (isConcurrent) { + SelfActor_ = CreateConcurrentDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); + } else { + SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); + } + SelfActorId_ = TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor()); } ~TDqChannelStorage() { - SelfActor_->Terminate(); + TlsActivationContext->AsActorContext().Send(SelfActorId_, new TEvents::TEvPoison); } bool IsEmpty() const override { @@ -49,13 +53,14 @@ class TDqChannelStorage : public IDqChannelStorage { private: IDqChannelStorageActor* SelfActor_; + TActorId SelfActorId_; }; } // anonymous namespace -IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem) +IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem, bool isConcurrent) { - return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem); + return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem, isConcurrent); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h index 481afbcaa48a..71d3c6ea1f11 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.h +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h @@ -10,7 +10,7 @@ namespace NActors { namespace NYql::NDq { -NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, - NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem); +IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, + IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem, bool isConcurrent); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp index e98719734f3b..d64a49066170 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp @@ -34,33 +34,140 @@ namespace { constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10; constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB; -class TDqChannelStorageActor : public IDqChannelStorageActor, - public NActors::TActorBootstrapped +class TDqChannelStorageActorBase : public IDqChannelStorageActor { - using TBase = TActorBootstrapped; public: - TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) + ~TDqChannelStorageActorBase() = default; + + TDqChannelStorageActorBase(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) : TxId_(txId) , ChannelId_(channelId) , WakeUp_(std::move(wakeUp)) , ActorSystem_(actorSystem) {} + bool IsEmpty() override { + return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty(); + } + + bool IsFull() override { + return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE; + } + + [[nodiscard]] + const TMaybe& GetError() const { + return Error_; + } + +protected: + + void PutInternal(ui64 blobId, TRope&& blob, TActorIdentity selfActorId, ui64 cookie) { + FailOnError(); + + // TODO: timeout + // TODO: limit inflight events + + ui64 size = blob.size(); + + SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), selfActorId, cookie); + + WritingBlobs_.emplace(blobId, size); + WritingBlobsSize_ += size; + } + + bool GetInternal(ui64 blobId, TBuffer& blob, TActorIdentity selfActorId, ui64 cookie) { + FailOnError(); + + auto loadedIt = LoadedBlobs_.find(blobId); + if (loadedIt != LoadedBlobs_.end()) { + YQL_ENSURE(loadedIt->second.size() != 0); + blob.Swap(loadedIt->second); + LoadedBlobs_.erase(loadedIt); + return true; + } + + auto result = LoadingBlobs_.emplace(blobId); + if (result.second) { + SendEvent(new TEvDqSpilling::TEvRead(blobId, true), selfActorId, cookie); + } + + return false; + } + + void FailOnError() { + if (Error_) { + LOG_E("Error: " << *Error_); + ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_ + << ", error: " << *Error_; + } + } + + template void SendEvent(T* event, TActorIdentity selfActorId, ui64 cookie) { + if (ActorSystem_) { + ActorSystem_->Send( + new IEventHandle( + SpillingActorId_, + selfActorId, + event, + /*flags=*/0, + cookie + ) + ); + } else { + selfActorId.Send(SpillingActorId_, event); + } + } + +protected: + const TTxId TxId_; + const ui64 ChannelId_; + IDqChannelStorage::TWakeUpCallback WakeUp_; + TActorId SpillingActorId_; + + TMap WritingBlobs_; // blobId -> blobSize + ui64 WritingBlobsSize_ = 0; + + ui32 StoredBlobsCount_ = 0; + ui64 StoredBlobsSize_ = 0; + + TSet LoadingBlobs_; + TMap LoadedBlobs_; + + TMaybe Error_; + + TActorSystem* ActorSystem_; +}; + +class TDqChannelStorageActor : public TDqChannelStorageActorBase, + public NActors::TActorBootstrapped +{ + using TBase = TActorBootstrapped; +public: + TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) + : TDqChannelStorageActorBase(txId, channelId, std::move(wakeUp), actorSystem) + {} void Bootstrap() { auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_, SelfId(), true); SpillingActorId_ = Register(spillingActor); - Become(&TDqChannelStorageActor::WorkState); } static constexpr char ActorName[] = "DQ_CHANNEL_STORAGE"; - NActors::IActor* GetActor() override { + IActor* GetActor() override { return this; } + void Put(ui64 blobId, TRope&& blob, ui64 cookie) override { + TDqChannelStorageActorBase::PutInternal(blobId, std::move(blob), SelfId(), cookie); + } + + bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) override { + return TDqChannelStorageActorBase::GetInternal(blobId, blob, SelfId(), cookie); + } + protected: void PassAway() override { Send(SpillingActorId_, new TEvents::TEvPoison); @@ -73,6 +180,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor, hFunc(TEvDqSpilling::TEvWriteResult, HandleWork); hFunc(TEvDqSpilling::TEvReadResult, HandleWork); hFunc(TEvDqSpilling::TEvError, HandleWork); + cFunc(TEvents::TEvPoison::EventType, PassAway); default: Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), @@ -125,102 +233,121 @@ class TDqChannelStorageActor : public IDqChannelStorageActor, Error_.ConstructInPlace(msg.Message); } +}; +class TConcurrentDqChannelStorageActor : public TDqChannelStorageActorBase, + public NActors::TActorBootstrapped +{ + using TBase = TActorBootstrapped; public: - [[nodiscard]] - const TMaybe& GetError() const { - return Error_; - } + TConcurrentDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) + : TDqChannelStorageActorBase(txId, channelId, std::move(wakeUp), actorSystem) + {} - bool IsEmpty() const override { - return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty(); - } - bool IsFull() const override { - return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE; + void Bootstrap() { + auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_, + SelfId(), true); + SpillingActorId_ = Register(spillingActor); + Become(&TConcurrentDqChannelStorageActor::WorkState); } - void Put(ui64 blobId, TRope&& blob, ui64 cookie) override { - FailOnError(); + static constexpr char ActorName[] = "DQ_CONCURRENT_CHANNEL_STORAGE"; - // TODO: timeout - // TODO: limit inflight events + IActor* GetActor() override { + return this; + } - ui64 size = blob.size(); + bool IsEmpty() override { + std::lock_guard lock(Mutex_); + return TDqChannelStorageActorBase::IsEmpty(); + } - SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), cookie); + bool IsFull() override { + std::lock_guard lock(Mutex_); + return TDqChannelStorageActorBase::IsFull(); + } - WritingBlobs_.emplace(blobId, size); - WritingBlobsSize_ += size; + void Put(ui64 blobId, TRope&& blob, ui64 cookie) override { + std::lock_guard lock(Mutex_); + TDqChannelStorageActorBase::PutInternal(blobId, std::move(blob), SelfId(), cookie); } bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) override { - FailOnError(); - - auto loadedIt = LoadedBlobs_.find(blobId); - if (loadedIt != LoadedBlobs_.end()) { - YQL_ENSURE(loadedIt->second.size() != 0); - blob.Swap(loadedIt->second); - LoadedBlobs_.erase(loadedIt); - return true; - } - - auto result = LoadingBlobs_.emplace(blobId); - if (result.second) { - SendEvent(new TEvDqSpilling::TEvRead(blobId, true), cookie); - } - - return false; + std::lock_guard lock(Mutex_); + return TDqChannelStorageActorBase::GetInternal(blobId, blob, SelfId(), cookie); } - void Terminate() override { - PassAway(); +protected: + void PassAway() override { + Send(SpillingActorId_, new TEvents::TEvPoison); + TBase::PassAway(); } private: - void FailOnError() { - if (Error_) { - LOG_E("Error: " << *Error_); - ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_ - << ", error: " << *Error_; + STATEFN(WorkState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvDqSpilling::TEvWriteResult, HandleWork); + hFunc(TEvDqSpilling::TEvReadResult, HandleWork); + hFunc(TEvDqSpilling::TEvError, HandleWork); + cFunc(TEvents::TEvPoison::EventType, PassAway); + default: + Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", + ev->GetTypeRewrite(), + ev->ToString().data()); } } - template - void SendEvent(T* event, ui64 cookie) { - if (ActorSystem_) { - ActorSystem_->Send( - new IEventHandle( - SpillingActorId_, - SelfId(), - event, - /*flags=*/0, - cookie - ) - ); - } else { - Send(SpillingActorId_, event); + void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_T("[TEvWriteResult] blobId: " << msg.BlobId); + + std::lock_guard lock(Mutex_); + auto it = WritingBlobs_.find(msg.BlobId); + if (it == WritingBlobs_.end()) { + LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId); + + Error_ = "Internal error"; + + Send(SpillingActorId_, new TEvents::TEvPoison); + return; } + + ui64 size = it->second; + WritingBlobsSize_ -= size; + WritingBlobs_.erase(it); + + StoredBlobsCount_++; + StoredBlobsSize_ += size; } -private: - const TTxId TxId_; - const ui64 ChannelId_; - IDqChannelStorage::TWakeUpCallback WakeUp_; - TActorId SpillingActorId_; + void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); - TMap WritingBlobs_; // blobId -> blobSize - ui64 WritingBlobsSize_ = 0; + std::lock_guard lock(Mutex_); + if (LoadingBlobs_.erase(msg.BlobId) != 1) { + LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); + return; + } - ui32 StoredBlobsCount_ = 0; - ui64 StoredBlobsSize_ = 0; + LoadedBlobs_[msg.BlobId].Swap(msg.Blob); + YQL_ENSURE(LoadedBlobs_[msg.BlobId].size() != 0); - TSet LoadingBlobs_; - TMap LoadedBlobs_; + if (LoadedBlobs_.size() == 1) { + WakeUp_(); + } + } - TMaybe Error_; + void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_D("[TEvError] " << msg.Message); - TActorSystem* ActorSystem_; + Error_.ConstructInPlace(msg.Message); + } + +private: + std::mutex Mutex_; }; } // anonymous namespace @@ -229,4 +356,8 @@ IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, return new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); } +IDqChannelStorageActor* CreateConcurrentDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) { + return new TConcurrentDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); +} + } // namespace NYql::NDq \ No newline at end of file diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h index f7c10d13c48e..85ed7681aff3 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h @@ -5,14 +5,32 @@ namespace NYql::NDq { -class IDqChannelStorageActor : public IDqChannelStorage +class IDqChannelStorageActor { public: - virtual void Terminate() = 0; + using TPtr = TIntrusivePtr; + using TWakeUpCallback = std::function; + + virtual ~IDqChannelStorageActor() = default; virtual NActors::IActor* GetActor() = 0; + + virtual bool IsEmpty() = 0; + virtual bool IsFull() = 0; + + // methods Put/Get can throw `TDqChannelStorageException` + + // Data should be owned by `blob` argument since the Put() call is actually asynchronous + virtual void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) = 0; + + // TODO: there is no way for client to delete blob. + // It is better to replace Get() with Pull() which will delete blob after read + // (current clients read each blob exactly once) + // Get() will return false if data is not ready yet. Client should repeat Get() in this case + virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0; }; IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem); +IDqChannelStorageActor* CreateConcurrentDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem); } // namespace NYql::NDq \ No newline at end of file diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index ae168064878b..b50f1ba9045f 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -137,7 +137,7 @@ class IDqTaskRunnerExecutionContext { TVector&& outputs) const = 0; virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0; - virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const = 0; + virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const = 0; }; class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext { @@ -155,7 +155,7 @@ class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContex return {}; }; - IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, NActors::TActorSystem* /*actorSystem*/) const override { + IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, NActors::TActorSystem* /*actorSystem*/, bool /*isConcurrent*/) const override { return {}; }; diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index d793bdfb4007..8dd1b059b10f 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -13,6 +13,8 @@ #include +#include + using namespace NYql::NDqs; using namespace NActors; @@ -33,6 +35,23 @@ TTaskRunnerActorSensors GetSensors(const T& t) { return result; } +class TSpillingStorageInfo : public TSimpleRefCount { +public: + using TPtr = std::shared_ptr; + + TSpillingStorageInfo(const IDqChannelStorage::TPtr spillingStorage, ui64 channelId) + : SpillingStorage(spillingStorage) + , ChannelId(channelId) + , FirstStoredId(0) + , NextStoredId(0) + {} + + const IDqChannelStorage::TPtr SpillingStorage = nullptr; + ui64 ChannelId = 0; + ui64 FirstStoredId = 0; + ui64 NextStoredId = 0; +}; + struct TOutputChannelReadResult { bool IsChanged = false; bool IsFinished = false; @@ -43,13 +62,26 @@ struct TOutputChannelReadResult { class TOutputChannelReader { public: - TOutputChannelReader(NTaskRunnerProxy::IOutputChannel::TPtr channel, i64 toPopSize, bool wasFinished) + TOutputChannelReader(NTaskRunnerProxy::IOutputChannel::TPtr channel, i64 toPopSize, + bool wasFinished, TSpillingStorageInfo::TPtr spillingStorageInfo, ui64 cookie + ) : Channel(channel) + , SpillingStorageInfo(spillingStorageInfo) , ToPopSize(toPopSize) , WasFinished(wasFinished) + , Cookie(cookie) {} TOutputChannelReadResult Read() { + if (SpillingStorageInfo) { + return ReadWithSpilling(); + } + return ReadDirectly(); + } + +private: + + TOutputChannelReadResult ReadDirectly() { int maxChunks = std::numeric_limits::max(); bool changed = false; bool isFinished = false; @@ -60,7 +92,7 @@ class TOutputChannelReader { if (remain == 0) { // special case to WorkerActor - remain = 5<<20; + remain = 5_MB; maxChunks = 1; } @@ -70,7 +102,6 @@ class TOutputChannelReader { const auto lastPop = std::move(Channel->Pop(data)); for (auto& metric : lastPop.GetMetric()) { - // *response.AddMetric() = metric; result.Metrics.push_back(metric); } @@ -89,10 +120,67 @@ class TOutputChannelReader { return result; } -private: + TOutputChannelReadResult ReadWithSpilling() { + int maxChunks = std::numeric_limits::max(); + bool changed = false; + bool isChanFinished = false; + i64 remain = ToPopSize; + bool hasData = true; + TOutputChannelReadResult result; + + if (remain == 0) { + // special case to WorkerActor + remain = 5_MB; + maxChunks = 1; + } + + auto spillingStorage = SpillingStorageInfo->SpillingStorage; + // Read all available data from the pipe and spill it + while (spillingStorage && !isChanFinished && hasData) { + TDqSerializedBatch data; + const auto lastPop = std::move(Channel->Pop(data)); + + for (auto& metric : lastPop.GetMetric()) { + result.Metrics.push_back(metric); + } + + hasData = lastPop.GetResult(); + isChanFinished = !hasData && Channel->IsFinished(); + changed = changed || hasData || (isChanFinished != WasFinished); + if (hasData) { + spillingStorage->Put(SpillingStorageInfo->NextStoredId++, SaveForSpilling(std::move(data)), Cookie); + } + } + + changed = false; + result.DataChunks.reserve(SpillingStorageInfo->NextStoredId - SpillingStorageInfo->FirstStoredId); + while (SpillingStorageInfo->FirstStoredId < SpillingStorageInfo->NextStoredId && remain > 0) { + TDqSerializedBatch data; + YQL_ENSURE(spillingStorage); + TBuffer blob; + if (!spillingStorage->Get(SpillingStorageInfo->FirstStoredId, blob, Cookie)) { + break; + } + ++SpillingStorageInfo->FirstStoredId; + data = LoadSpilled(std::move(blob)); + remain -= data.Size(); + result.DataChunks.emplace_back(std::move(data)); + --maxChunks; + changed = true; + hasData = true; + } + + result.IsFinished = isChanFinished && SpillingStorageInfo->FirstStoredId == SpillingStorageInfo->NextStoredId; + result.IsChanged = changed; + result.HasData = hasData; + return result; + } + NTaskRunnerProxy::IOutputChannel::TPtr Channel; + TSpillingStorageInfo::TPtr SpillingStorageInfo; i64 ToPopSize; bool WasFinished; + ui64 Cookie; }; } // namespace @@ -369,11 +457,15 @@ class TTaskRunnerActor auto cookie = ev->Cookie; auto wasFinished = ev->Get()->WasFinished; auto toPop = ev->Get()->Size; - Invoker->Invoke([cookie,selfId,channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId]() { + ui64 channelId = ev->Get()->ChannelId; + + TSpillingStorageInfo::TPtr spillingStorageInfo = GetSpillingStorage(channelId, actorSystem); + + Invoker->Invoke([spillingStorageInfo, cookie, selfId, channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId]() { try { // auto guard = taskRunner->BindAllocator(); // only for local mode auto channel = taskRunner->GetOutputChannel(channelId); - TOutputChannelReader reader(channel, toPop, wasFinished); + TOutputChannelReader reader(channel, toPop, wasFinished, spillingStorageInfo, cookie); TOutputChannelReadResult result = reader.Read(); NDqProto::TPopResponse response; @@ -484,6 +576,7 @@ class TTaskRunnerActor auto taskId = ev->Get()->Task.GetId(); auto& inputs = ev->Get()->Task.GetInputs(); auto startTime = TInstant::Now(); + ExecCtx = ev->Get()->ExecCtx; for (auto inputId = 0; inputId < inputs.size(); inputId++) { auto& input = inputs[inputId]; @@ -615,6 +708,22 @@ class TTaskRunnerActor }); } + TSpillingStorageInfo::TPtr GetSpillingStorage(ui64 channelId, TActorSystem* actorSystem) { + TSpillingStorageInfo::TPtr spillingStorageInfo = nullptr; + auto channelStorage = ExecCtx->CreateChannelStorage(channelId, actorSystem, true /*isConcurrent*/); + + if (channelStorage) { + auto spillingIt = SpillingStoragesInfos.find(channelId); + if (spillingIt == SpillingStoragesInfos.end()) { + TSpillingStorageInfo* info = new TSpillingStorageInfo(channelStorage, channelId); + spillingIt = SpillingStoragesInfos.emplace(channelId, info).first; + } + spillingStorageInfo = spillingIt->second; + } + + return spillingStorageInfo; + } + NActors::TActorId ParentId; ITaskRunnerActor::ICallbacks* Parent; const TString TraceId; @@ -630,6 +739,9 @@ class TTaskRunnerActor ui64 StageId; TWorkerRuntimeData* RuntimeData; TString ClusterName; + + std::shared_ptr ExecCtx; + std::unordered_map SpillingStoragesInfos; }; class TTaskRunnerActorFactory: public ITaskRunnerActorFactory {