From c48ea73cb02a963b103ea679cee026e91f30382e Mon Sep 17 00:00:00 2001 From: Aidar Samerkhanov Date: Mon, 18 Dec 2023 18:01:03 +0300 Subject: [PATCH] Refactor DQ Channel Storage to accept external actor system (#547) --- ydb/core/tx/datashard/datashard_kqp.cpp | 4 ++ .../compute/dq_task_runner_exec_ctx.cpp | 6 ++- .../actors/compute/dq_task_runner_exec_ctx.h | 2 +- .../dq/actors/spilling/channel_storage.cpp | 49 +++++++++++++------ .../yql/dq/actors/spilling/channel_storage.h | 6 ++- .../yql/dq/runtime/dq_channel_storage.h | 4 +- ydb/library/yql/dq/runtime/dq_tasks_runner.h | 10 ++++ ydb/library/yql/dq/runtime/ut/ut_helper.h | 4 +- 8 files changed, 64 insertions(+), 21 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 6dc63fd8b1d3..2dc360b98aa5 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -1110,6 +1110,10 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */) const override { return {}; } + + NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */) const override { + return {}; + } }; } // anonymous namespace diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp index 757d35ebb5a0..82ccd35b5844 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp @@ -14,8 +14,12 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool wi } IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const { + return CreateChannelStorage(channelId, nullptr); +} + +IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const { if (WithSpilling_) { - return CreateDqChannelStorage(TxId_, channelId, WakeUp_); + return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem); } else { return nullptr; } diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h index a405aafc1d5b..1c6272018769 100644 --- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h @@ -4,7 +4,6 @@ #include #include - namespace NYql { namespace NDq { @@ -13,6 +12,7 @@ class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase { TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp); IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override; + IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const override; private: const TTxId TxId_; diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp index f15c19237944..1f8b61dcf8dc 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp @@ -41,10 +41,12 @@ class TDqChannelStorageActor : public TActorBootstrapped using TBase = TActorBootstrapped; public: - TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp) + TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) : TxId_(txId) , ChannelId_(channelId) - , WakeUp_(std::move(wakeUp)) {} + , WakeUp_(std::move(wakeUp)) + , ActorSystem_(actorSystem) + {} void Bootstrap() { auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_, @@ -135,7 +137,7 @@ class TDqChannelStorageActor : public TActorBootstrapped return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE; } - void Put(ui64 blobId, TRope&& blob) { + void Put(ui64 blobId, TRope&& blob, ui64 cookie) { FailOnError(); // TODO: timeout @@ -143,13 +145,13 @@ class TDqChannelStorageActor : public TActorBootstrapped ui64 size = blob.size(); - Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(blobId, std::move(blob))); + SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), cookie); WritingBlobs_.emplace(blobId, size); WritingBlobsSize_ += size; } - bool Get(ui64 blobId, TBuffer& blob) { + bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) { FailOnError(); auto loadedIt = LoadedBlobs_.find(blobId); @@ -162,7 +164,7 @@ class TDqChannelStorageActor : public TActorBootstrapped auto result = LoadingBlobs_.emplace(blobId); if (result.second) { - Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, true)); + SendEvent(new TEvDqSpilling::TEvRead(blobId, true), cookie); } return false; @@ -181,6 +183,23 @@ class TDqChannelStorageActor : public TActorBootstrapped } } + template + void SendEvent(T* event, ui64 cookie) { + if (ActorSystem_) { + ActorSystem_->Send( + new IEventHandle( + SpillingActorId_, + SelfId(), + event, + /*flags=*/0, + cookie + ) + ); + } else { + Send(SpillingActorId_, event); + } + } + private: const TTxId TxId_; const ui64 ChannelId_; @@ -197,13 +216,15 @@ class TDqChannelStorageActor : public TActorBootstrapped TMap LoadedBlobs_; TMaybe Error_; + + TActorSystem* ActorSystem_; }; class TDqChannelStorage : public IDqChannelStorage { public: - TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp) { - SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp)); + TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) { + SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_); } @@ -219,12 +240,12 @@ class TDqChannelStorage : public IDqChannelStorage { return SelfActor_->IsFull(); } - void Put(ui64 blobId, TRope&& blob) override { - SelfActor_->Put(blobId, std::move(blob)); + void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) override { + SelfActor_->Put(blobId, std::move(blob), cookie); } - bool Get(ui64 blobId, TBuffer& blob) override { - return SelfActor_->Get(blobId, blob); + bool Get(ui64 blobId, TBuffer& blob, ui64 cookie = 0) override { + return SelfActor_->Get(blobId, blob, cookie); } private: @@ -233,9 +254,9 @@ class TDqChannelStorage : public IDqChannelStorage { } // anonymous namespace -IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp) +IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem) { - return new TDqChannelStorage(txId, channelId, std::move(wakeUp)); + return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h index 3a9d2b9ada21..481afbcaa48a 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.h +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h @@ -4,9 +4,13 @@ #include #include +namespace NActors { + class TActorSystem; +}; + namespace NYql::NDq { NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, - NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb); + NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_channel_storage.h b/ydb/library/yql/dq/runtime/dq_channel_storage.h index 9222923ebd9a..1eed0db0136f 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_storage.h +++ b/ydb/library/yql/dq/runtime/dq_channel_storage.h @@ -26,13 +26,13 @@ class IDqChannelStorage : public TSimpleRefCount { // methods Put/Get can throw `TDqChannelStorageException` // Data should be owned by `blob` argument since the Put() call is actually asynchronous - virtual void Put(ui64 blobId, TRope&& blob) = 0; + virtual void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) = 0; // TODO: there is no way for client to delete blob. // It is better to replace Get() with Pull() which will delete blob after read // (current clients read each blob exactly once) // Get() will return false if data is not ready yet. Client should repeat Get() in this case - virtual bool Get(ui64 blobId, TBuffer& data) = 0; + virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index d5353a5ecc27..ae168064878b 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -22,6 +22,10 @@ #include #include +namespace NActors { + class TActorSystem; +}; + namespace NYql::NDq { enum class ERunStatus : ui32 { @@ -133,6 +137,7 @@ class IDqTaskRunnerExecutionContext { TVector&& outputs) const = 0; virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0; + virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const = 0; }; class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext { @@ -149,6 +154,11 @@ class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContex IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/) const override { return {}; }; + + IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, NActors::TActorSystem* /*actorSystem*/) const override { + return {}; + }; + }; struct TDqTaskRunnerSettings { diff --git a/ydb/library/yql/dq/runtime/ut/ut_helper.h b/ydb/library/yql/dq/runtime/ut/ut_helper.h index 56faea9877b8..05546b8003d9 100644 --- a/ydb/library/yql/dq/runtime/ut/ut_helper.h +++ b/ydb/library/yql/dq/runtime/ut/ut_helper.h @@ -20,7 +20,7 @@ class TMockChannelStorage : public IDqChannelStorage { return Capacity <= UsedSpace; } - void Put(ui64 blobId, TRope&& blob) override { + void Put(ui64 blobId, TRope&& blob, ui64 /* cookie = 0 */) override { if (UsedSpace + blob.size() > Capacity) { ythrow yexception() << "Space limit exceeded"; } @@ -30,7 +30,7 @@ class TMockChannelStorage : public IDqChannelStorage { UsedSpace += result.first->second.size(); } - bool Get(ui64 blobId, TBuffer& data) override { + bool Get(ui64 blobId, TBuffer& data, ui64 /* cookie = 0 */) override { if (!Blobs.contains(blobId)) { ythrow yexception() << "Not found"; }