Skip to content

Commit

Permalink
YQL-17087: Add channel spilling to dq pipe communication (ydb-platfor…
Browse files Browse the repository at this point in the history
…m#612)

* Add channel spilling to dq pipe communication
* Add EvPoison handler to DqChannelStorageActor
  • Loading branch information
Darych authored and marsaly79 committed Dec 22, 2023
1 parent 14f87e8 commit d13e6e7
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 96 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext
return {};
}

NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */) const override {
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override {
return {};
}
};
Expand Down
6 changes: 3 additions & 3 deletions ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool wi
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
return CreateChannelStorage(channelId, nullptr);
return CreateChannelStorage(channelId, nullptr, false);
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const {
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const {
if (WithSpilling_) {
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem, isConcurrent);
} else {
return nullptr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp);

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const override;

private:
const TTxId TxId_;
Expand Down
17 changes: 11 additions & 6 deletions ydb/library/yql/dq/actors/spilling/channel_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ namespace {

class TDqChannelStorage : public IDqChannelStorage {
public:
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) {
SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem, bool isConcurrent) {
if (isConcurrent) {
SelfActor_ = CreateConcurrentDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
} else {
SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
}
SelfActorId_ = TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
}

~TDqChannelStorage() {
SelfActor_->Terminate();
TlsActivationContext->AsActorContext().Send(SelfActorId_, new TEvents::TEvPoison);
}

bool IsEmpty() const override {
Expand All @@ -49,13 +53,14 @@ class TDqChannelStorage : public IDqChannelStorage {

private:
IDqChannelStorageActor* SelfActor_;
TActorId SelfActorId_;
};

} // anonymous namespace

IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem, bool isConcurrent)
{
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem, isConcurrent);
}

} // namespace NYql::NDq
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/actors/spilling/channel_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NActors {

namespace NYql::NDq {

NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem, bool isConcurrent);

} // namespace NYql::NDq
Loading

0 comments on commit d13e6e7

Please sign in to comment.