Skip to content

Commit

Permalink
Refactor DQ Channel Storage to accept external actor system (ydb-plat…
Browse files Browse the repository at this point in the history
  • Loading branch information
Darych authored and azevaykin committed Dec 19, 2023
1 parent e6e7b74 commit 5c1aca8
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 21 deletions.
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,10 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */) const override {
return {};
}

NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */) const override {
return {};
}
};

} // anonymous namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool wi
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
return CreateChannelStorage(channelId, nullptr);
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const {
if (WithSpilling_) {
return CreateDqChannelStorage(TxId_, channelId, WakeUp_);
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
} else {
return nullptr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/actors/core/actor.h>


namespace NYql {
namespace NDq {

Expand All @@ -13,6 +12,7 @@ class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp);

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const override;

private:
const TTxId TxId_;
Expand Down
49 changes: 35 additions & 14 deletions ydb/library/yql/dq/actors/spilling/channel_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
using TBase = TActorBootstrapped<TDqChannelStorageActor>;

public:
TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
: TxId_(txId)
, ChannelId_(channelId)
, WakeUp_(std::move(wakeUp)) {}
, WakeUp_(std::move(wakeUp))
, ActorSystem_(actorSystem)
{}

void Bootstrap() {
auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_,
Expand Down Expand Up @@ -135,21 +137,21 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE;
}

void Put(ui64 blobId, TRope&& blob) {
void Put(ui64 blobId, TRope&& blob, ui64 cookie) {
FailOnError();

// TODO: timeout
// TODO: limit inflight events

ui64 size = blob.size();

Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(blobId, std::move(blob)));
SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), cookie);

WritingBlobs_.emplace(blobId, size);
WritingBlobsSize_ += size;
}

bool Get(ui64 blobId, TBuffer& blob) {
bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) {
FailOnError();

auto loadedIt = LoadedBlobs_.find(blobId);
Expand All @@ -162,7 +164,7 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>

auto result = LoadingBlobs_.emplace(blobId);
if (result.second) {
Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, true));
SendEvent(new TEvDqSpilling::TEvRead(blobId, true), cookie);
}

return false;
Expand All @@ -181,6 +183,23 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
}
}

template<typename T>
void SendEvent(T* event, ui64 cookie) {
if (ActorSystem_) {
ActorSystem_->Send(
new IEventHandle(
SpillingActorId_,
SelfId(),
event,
/*flags=*/0,
cookie
)
);
} else {
Send(SpillingActorId_, event);
}
}

private:
const TTxId TxId_;
const ui64 ChannelId_;
Expand All @@ -197,13 +216,15 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
TMap<ui64, TBuffer> LoadedBlobs_;

TMaybe<TString> Error_;

TActorSystem* ActorSystem_;
};


class TDqChannelStorage : public IDqChannelStorage {
public:
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp) {
SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp));
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) {
SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_);
}

Expand All @@ -219,12 +240,12 @@ class TDqChannelStorage : public IDqChannelStorage {
return SelfActor_->IsFull();
}

void Put(ui64 blobId, TRope&& blob) override {
SelfActor_->Put(blobId, std::move(blob));
void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) override {
SelfActor_->Put(blobId, std::move(blob), cookie);
}

bool Get(ui64 blobId, TBuffer& blob) override {
return SelfActor_->Get(blobId, blob);
bool Get(ui64 blobId, TBuffer& blob, ui64 cookie = 0) override {
return SelfActor_->Get(blobId, blob, cookie);
}

private:
Expand All @@ -233,9 +254,9 @@ class TDqChannelStorage : public IDqChannelStorage {

} // anonymous namespace

IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp)
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
{
return new TDqChannelStorage(txId, channelId, std::move(wakeUp));
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
}

} // namespace NYql::NDq
6 changes: 5 additions & 1 deletion ydb/library/yql/dq/actors/spilling/channel_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
#include <ydb/library/yql/dq/runtime/dq_channel_storage.h>
#include <ydb/library/actors/core/actor.h>

namespace NActors {
class TActorSystem;
};

namespace NYql::NDq {

NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb);
NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);

} // namespace NYql::NDq
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/runtime/dq_channel_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ class IDqChannelStorage : public TSimpleRefCount<IDqChannelStorage> {
// methods Put/Get can throw `TDqChannelStorageException`

// Data should be owned by `blob` argument since the Put() call is actually asynchronous
virtual void Put(ui64 blobId, TRope&& blob) = 0;
virtual void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) = 0;

// TODO: there is no way for client to delete blob.
// It is better to replace Get() with Pull() which will delete blob after read
// (current clients read each blob exactly once)
// Get() will return false if data is not ready yet. Client should repeat Get() in this case
virtual bool Get(ui64 blobId, TBuffer& data) = 0;
virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0;
};

} // namespace NYql::NDq
10 changes: 10 additions & 0 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include <util/generic/size_literals.h>
#include <util/system/types.h>

namespace NActors {
class TActorSystem;
};

namespace NYql::NDq {

enum class ERunStatus : ui32 {
Expand Down Expand Up @@ -133,6 +137,7 @@ class IDqTaskRunnerExecutionContext {
TVector<IDqOutput::TPtr>&& outputs) const = 0;

virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0;
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const = 0;
};

class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext {
Expand All @@ -149,6 +154,11 @@ class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContex
IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/) const override {
return {};
};

IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, NActors::TActorSystem* /*actorSystem*/) const override {
return {};
};

};

struct TDqTaskRunnerSettings {
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/runtime/ut/ut_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TMockChannelStorage : public IDqChannelStorage {
return Capacity <= UsedSpace;
}

void Put(ui64 blobId, TRope&& blob) override {
void Put(ui64 blobId, TRope&& blob, ui64 /* cookie = 0 */) override {
if (UsedSpace + blob.size() > Capacity) {
ythrow yexception() << "Space limit exceeded";
}
Expand All @@ -30,7 +30,7 @@ class TMockChannelStorage : public IDqChannelStorage {
UsedSpace += result.first->second.size();
}

bool Get(ui64 blobId, TBuffer& data) override {
bool Get(ui64 blobId, TBuffer& data, ui64 /* cookie = 0 */) override {
if (!Blobs.contains(blobId)) {
ythrow yexception() << "Not found";
}
Expand Down

0 comments on commit 5c1aca8

Please sign in to comment.