Skip to content

Commit

Permalink
YQL-17087. Fix work with actor system in case of async compute actors.
Browse files Browse the repository at this point in the history
Fix response result field set for reads without spilling.
Add enable-spilling flag to worker node.
  • Loading branch information
Darych committed Dec 27, 2023
1 parent ad45285 commit 672f769
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
19 changes: 17 additions & 2 deletions ydb/library/yql/dq/actors/spilling/channel_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,30 @@ namespace {

class TDqChannelStorage : public IDqChannelStorage {
public:
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem, bool isConcurrent) {
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem, bool isConcurrent)
: ActorSystem_(actorSystem)
{
if (isConcurrent) {
SelfActor_ = CreateConcurrentDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
} else {
SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
}
SelfActorId_ = TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
SelfId_ = TlsActivationContext->AsActorContext().SelfID;
}

~TDqChannelStorage() {
TlsActivationContext->AsActorContext().Send(SelfActorId_, new TEvents::TEvPoison);
if (ActorSystem_) {
ActorSystem_->Send(
new IEventHandle(
SelfActorId_,
SelfId_,
new TEvents::TEvPoison,
/*flags=*/0,
/*cookie=*/0));
} else {
TlsActivationContext->AsActorContext().Send(SelfActorId_, new TEvents::TEvPoison);
}
}

bool IsEmpty() const override {
Expand All @@ -54,6 +67,8 @@ class TDqChannelStorage : public IDqChannelStorage {
private:
IDqChannelStorageActor* SelfActor_;
TActorId SelfActorId_;
TActorId SelfId_;
TActorSystem *ActorSystem_;
};

} // anonymous namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class TOutputChannelReader {
}

hasData = lastPop.GetResult();
result.HasData = result.HasData || hasData;
dataSize = data.Size();
isFinished = !hasData && Channel->IsFinished();
changed = changed || hasData || (isFinished != WasFinished);
Expand All @@ -116,7 +117,6 @@ class TOutputChannelReader {
}
result.IsFinished = isFinished;
result.IsChanged = changed;
result.HasData = hasData;
return result;
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/tools/dq/worker_node/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ int main(int argc, char** argv) {
opts.AddLongOption("disable_pipe", "Disable pipe").NoArgument();
opts.AddLongOption("log_level", "Log Level");
opts.AddLongOption("ipv4", "Use ipv4").NoArgument();
opts.AddLongOption("enable-spilling", "Enable disk spilling").NoArgument();

ui32 threads = THREAD_PER_NODE;
TString host;
Expand Down Expand Up @@ -405,7 +406,7 @@ int main(int argc, char** argv) {
})
: NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
lwmOptions.ComputeActorOwnsCounters = true;
lwmOptions.UseSpilling = true;
lwmOptions.UseSpilling = res.Has("enable-spilling");
auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);

auto workerManagerActorId = actorSystem->Register(resman);
Expand Down

0 comments on commit 672f769

Please sign in to comment.