Skip to content

Commit

Permalink
Merge 7a291e9 into 97d8627
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored May 22, 2024
2 parents 97d8627 + 7a291e9 commit cfac4a0
Show file tree
Hide file tree
Showing 55 changed files with 687 additions and 376 deletions.
10 changes: 6 additions & 4 deletions ydb/core/fq/libs/checkpoint_storage/state_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

#include <ydb/core/fq/libs/checkpointing_common/defs.h>

#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h>

#include <ydb/library/yql/public/issue/yql_issue.h>

#include <library/cpp/threading/future/core/future.h>
Expand All @@ -15,16 +16,17 @@ namespace NFq {

class IStateStorage : public virtual TThrRefBase {
public:
using TGetStateResult = std::pair<std::vector<NYql::NDqProto::TComputeActorState>, NYql::TIssues>;
using TGetStateResult = std::pair<std::vector<NYql::NDq::TComputeActorState>, NYql::TIssues>;
using TSaveStateResult = std::pair<size_t, NYql::TIssues>;
using TCountStatesResult = std::pair<size_t, NYql::TIssues>;

virtual NThreading::TFuture<NYql::TIssues> Init() = 0;

virtual NThreading::TFuture<NYql::TIssues> SaveState(
virtual NThreading::TFuture<TSaveStateResult> SaveState(
ui64 taskId,
const TString& graphId,
const TCheckpointId& checkpointId,
const NYql::NDqProto::TComputeActorState& state) = 0;
const NYql::NDq::TComputeActorState& state) = 0;

virtual NThreading::TFuture<TGetStateResult> GetState(
const std::vector<ui64>& taskIds,
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,13 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
taskId = event->TaskId,
cookie = ev->Cookie,
sender = ev->Sender,
stateSize = stateSize,
actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture<NYql::TIssues>& futureResult) {
actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture<IStateStorage::TSaveStateResult>& futureResult) {
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << graphId << "] [" << checkpointId << "] TEvSaveTaskState Apply: task: " << taskId)
const auto& issues = futureResult.GetValue();
const auto& issues = futureResult.GetValue().second;
auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
response->Record.MutableCheckpoint()->SetId(checkpointId.SeqNo);
response->Record.SetStateSizeBytes(stateSize);
response->Record.SetStateSizeBytes(futureResult.GetValue().first);
response->Record.SetTaskId(taskId);

if (issues) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/fq/libs/checkpoint_storage/ut/gc_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

NYql::NDqProto::TComputeActorState MakeStateFromBlob(size_t blobSize, bool isIncrement = false) {
NYql::NDq::TComputeActorState MakeStateFromBlob(size_t blobSize, bool isIncrement = false) {
TString blob;
blob.reserve(blobSize);
for (size_t i = 0; i < blobSize; ++i) {
Expand All @@ -51,8 +51,8 @@ NYql::NDqProto::TComputeActorState MakeStateFromBlob(size_t blobSize, bool isInc
const TStringBuf savedBuf = value.AsStringRef();
TString result;
NKikimr::NMiniKQL::TNodeStateHelper::AddNodeState(result, savedBuf);
NYql::NDqProto::TComputeActorState state;
state.MutableMiniKqlProgram()->MutableData()->MutableStateData()->SetBlob(result);
NYql::NDq::TComputeActorState state;
state.MiniKqlProgram.ConstructInPlace().Data.Blob = result;
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ void SaveState(
checkpoint.SetGeneration(checkpointId.CoordinatorGeneration);
checkpoint.SetId(checkpointId.SeqNo);
auto request = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskState>(GraphId, taskId, checkpoint);
request->State.MutableMiniKqlProgram()->MutableData()->MutableStateData()->SetBlob(blob);
request->State.MiniKqlProgram.ConstructInPlace().Data.Blob = blob;
runtime->Send(new IEventHandle(NYql::NDq::MakeCheckpointStorageID(), sender, request.release()));

TAutoPtr<IEventHandle> handle;
Expand Down Expand Up @@ -247,7 +247,7 @@ TString GetState(
UNIT_ASSERT(event->Issues.Empty());
UNIT_ASSERT(!event->States.empty());

return event->States[0].GetMiniKqlProgram().GetData().GetStateData().GetBlob();
return event->States[0].MiniKqlProgram->Data.Blob;
}

void CreateCompletedCheckpoint(
Expand Down
88 changes: 59 additions & 29 deletions ydb/core/fq/libs/checkpoint_storage/ut/ydb_state_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ class TFixture : public NUnitTest::TBaseFixture {
return storage;
}

NYql::NDqProto::TComputeActorState MakeState(NYql::NUdf::TUnboxedValuePod&& value) {
NYql::NDq::TComputeActorState MakeState(NYql::NUdf::TUnboxedValuePod&& value) {
TString result;
NKikimr::NMiniKQL::TNodeStateHelper::AddNodeState(result, value.AsStringRef());
NYql::NDqProto::TComputeActorState state;
state.MutableMiniKqlProgram()->MutableData()->MutableStateData()->SetBlob(result);
NYql::NDq::TComputeActorState state;
state.MiniKqlProgram.ConstructInPlace().Data.Blob = result;
return state;
}

NYql::NDqProto::TComputeActorState MakeStateFromBlob(size_t blobSize) {
NYql::NDq::TComputeActorState MakeStateFromBlob(size_t blobSize) {
TString blob;
blob.reserve(blobSize);
for (size_t i = 0; i < blobSize; ++i) {
Expand All @@ -69,7 +69,7 @@ class TFixture : public NUnitTest::TBaseFixture {
return MakeState(NKikimr::NMiniKQL::TOutputSerializer::MakeSimpleBlobState(blob, 0));
}

NYql::NDqProto::TComputeActorState MakeIncrementState(size_t miniKqlPStateSize) {
NYql::NDq::TComputeActorState MakeIncrementState(size_t miniKqlPStateSize) {
std::map<TString, TString> map;
size_t itemCount = 4;
for (size_t i = 0; i < itemCount; ++i) {
Expand All @@ -78,7 +78,7 @@ class TFixture : public NUnitTest::TBaseFixture {
return MakeState(NKikimr::NMiniKQL::TOutputSerializer::MakeSnapshotState(map, 0));
}

NYql::NDqProto::TComputeActorState MakeIncrementState(
NYql::NDq::TComputeActorState MakeIncrementState(
const std::map<TString, TString>& snapshot,
const std::map<TString, TString>& increment,
const std::set<TString>& deleted)
Expand All @@ -94,13 +94,14 @@ class TFixture : public NUnitTest::TBaseFixture {
ui64 taskId,
const TString& graphId,
const TCheckpointId& checkpointId,
const NYql::NDqProto::TComputeActorState& state)
const NYql::NDq::TComputeActorState& state)
{
auto issues = storage->SaveState(taskId, graphId, checkpointId, state).GetValueSync();
auto [size, issues] = storage->SaveState(taskId, graphId, checkpointId, state).GetValueSync();
UNIT_ASSERT_C(issues.Empty(), issues.ToString());
UNIT_ASSERT(size > 0);
}

NYql::NDqProto::TComputeActorState GetState(
NYql::NDq::TComputeActorState GetState(
TStateStoragePtr storage,
const ui64 taskId,
const TString& graphId,
Expand All @@ -112,16 +113,43 @@ class TFixture : public NUnitTest::TBaseFixture {
return states[0];
}

void ShouldSaveGetStateImpl(const char* tablePrefix, const NYql::NDqProto::TComputeActorState& state) {
void ShouldSaveGetStateImpl(const char* tablePrefix, const NYql::NDq::TComputeActorState& state) {
auto storage = GetStateStorage(tablePrefix);
auto issues = storage->SaveState(1, "graph1", CheckpointId1, state).GetValueSync();
auto [size, issues] = storage->SaveState(1, "graph1", CheckpointId1, state).GetValueSync();
UNIT_ASSERT_C(issues.Empty(), issues.ToString());
UNIT_ASSERT(size > 0);

auto [states, getIssues] = storage->GetState({1}, "graph1", CheckpointId1).GetValueSync();
UNIT_ASSERT_C(getIssues.Empty(), getIssues.ToString());
UNIT_ASSERT(!states.empty());
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state, states[0]));
CheckEquals(state, states[0]);
}

void CheckEquals(const NYql::NDq::TComputeActorState& state1, const NYql::NDq::TComputeActorState& state2) {
UNIT_ASSERT_VALUES_EQUAL(state1.MiniKqlProgram.Empty(), state2.MiniKqlProgram.Empty());
if (state1.MiniKqlProgram) {
UNIT_ASSERT_VALUES_EQUAL(state1.MiniKqlProgram->Data.Blob, state2.MiniKqlProgram->Data.Blob);
UNIT_ASSERT_VALUES_EQUAL(state1.MiniKqlProgram->Data.Version, state2.MiniKqlProgram->Data.Version);
UNIT_ASSERT_VALUES_EQUAL(state1.MiniKqlProgram->RuntimeVersion, state2.MiniKqlProgram->RuntimeVersion);
}
UNIT_ASSERT_VALUES_EQUAL(state1.Sources.size(), state2.Sources.size());
UNIT_ASSERT(std::equal(std::begin(state1.Sources), std::end(state1.Sources), std::begin(state2.Sources), std::end(state2.Sources),
[](const NYql::NDq::TSourceState& state1, const NYql::NDq::TSourceState& state2) {
UNIT_ASSERT_VALUES_EQUAL(state1.InputIndex, state2.InputIndex);
UNIT_ASSERT_VALUES_EQUAL(state1.Data.size(), state2.Data.size());
return true;
}));

UNIT_ASSERT_VALUES_EQUAL(state1.Sinks.size(), state2.Sinks.size());
UNIT_ASSERT(std::equal(std::begin(state1.Sinks), std::end(state1.Sinks), std::begin(state2.Sinks), std::end(state2.Sinks),
[](const NYql::NDq::TSinkState& state1, const NYql::NDq::TSinkState& state2) {
UNIT_ASSERT_VALUES_EQUAL(state1.OutputIndex, state2.OutputIndex);
UNIT_ASSERT_VALUES_EQUAL(state1.Data.Blob, state2.Data.Blob);
UNIT_ASSERT_VALUES_EQUAL(state1.Data.Version, state2.Data.Version);
return true;
}));
}

private:
NKikimr::NMiniKQL::TScopedAlloc Alloc;
NKikimr::TActorSystemStub ActorSystemStub;
Expand All @@ -147,8 +175,8 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
auto state2 = NKikimr::NMiniKQL::TOutputSerializer::MakeSimpleBlobState(TString(20, 'b'), 0);
NKikimr::NMiniKQL::TNodeStateHelper::AddNodeState(result, state1.AsStringRef());
NKikimr::NMiniKQL::TNodeStateHelper::AddNodeState(result, state2.AsStringRef());
NYql::NDqProto::TComputeActorState state;
state.MutableMiniKqlProgram()->MutableData()->MutableStateData()->SetBlob(result);
NYql::NDq::TComputeActorState state;
state.MiniKqlProgram.ConstructInPlace().Data.Blob = result;
ShouldSaveGetStateImpl("TStateStorageTestShouldSaveGetState", state);
}

Expand Down Expand Up @@ -300,8 +328,9 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
Y_UNIT_TEST_F(ShouldIssueErrorOnNonExistentState, TFixture) {
auto storage = GetStateStorage("TStateStorageTestShouldIssueErrorOnNonExistentState");

auto issues = storage->SaveState(1, "graph1", CheckpointId1, MakeStateFromBlob(4)).GetValueSync();
auto [size, issues] = storage->SaveState(1, "graph1", CheckpointId1, MakeStateFromBlob(4)).GetValueSync();
UNIT_ASSERT(issues.Empty());
UNIT_ASSERT(size > 0);

auto getResult = storage->GetState({1}, "graph1", CheckpointId1).GetValueSync();
UNIT_ASSERT(getResult.second.Empty());
Expand All @@ -321,32 +350,33 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
auto state3 = MakeStateFromBlob(YdbRowSizeLimit * 6);
auto state4 = MakeIncrementState(YdbRowSizeLimit * 3);

auto issues = storage->SaveState(1, "graph1", CheckpointId1, state1).GetValueSync();
auto [size, issues] = storage->SaveState(1, "graph1", CheckpointId1, state1).GetValueSync();
UNIT_ASSERT(issues.Empty());
issues = storage->SaveState(42, "graph1", CheckpointId1, state2).GetValueSync();
UNIT_ASSERT(size > 0);
issues = storage->SaveState(42, "graph1", CheckpointId1, state2).GetValueSync().second;
UNIT_ASSERT(issues.Empty());
issues = storage->SaveState(7, "graph1", CheckpointId1, state3).GetValueSync();
issues = storage->SaveState(7, "graph1", CheckpointId1, state3).GetValueSync().second;
UNIT_ASSERT(issues.Empty());
issues = storage->SaveState(13, "graph1", CheckpointId1, state4).GetValueSync();
issues = storage->SaveState(13, "graph1", CheckpointId1, state4).GetValueSync().second;
UNIT_ASSERT(issues.Empty());

auto [states, getIssues] = storage->GetState({1, 42, 7, 13}, "graph1", CheckpointId1).GetValueSync();
UNIT_ASSERT_C(getIssues.Empty(), getIssues.ToString());
UNIT_ASSERT_VALUES_EQUAL(states.size(), 4);

UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state1, states[0]));
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state2, states[1]));
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state3, states[2]));
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state4, states[3]));
CheckEquals(state1, states[0]);
CheckEquals(state2, states[1]);
CheckEquals(state3, states[2]);
CheckEquals(state4, states[3]);

// in different order
auto [states2, getIssues2] = storage->GetState({42, 1, 13, 7}, "graph1", CheckpointId1).GetValueSync();
UNIT_ASSERT(getIssues2.Empty());
UNIT_ASSERT_VALUES_EQUAL(states2.size(), 4);
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state2, states2[0]));
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state1, states2[1]));
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state4, states2[2]));
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state3, states2[3]));
CheckEquals(state2, states2[0]);
CheckEquals(state1, states2[1]);
CheckEquals(state4, states2[2]);
CheckEquals(state3, states2[3]);
}

Y_UNIT_TEST_F(ShouldLoadLastSnapshot, TFixture)
Expand All @@ -360,7 +390,7 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
SaveState(storage, 1, "graph1", CheckpointId2, state2);

auto state = GetState(storage, 1, "graph1", CheckpointId2);
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(state, state2));
CheckEquals(state, state2);
}

Y_UNIT_TEST_F(ShouldNotGetNonExistendSnaphotState, TFixture)
Expand Down Expand Up @@ -392,7 +422,7 @@ Y_UNIT_TEST_SUITE(TStateStorageTest) {
auto expected = MakeIncrementState({{"key1", "value1-new"}, {"key3", "value3"}, {"key4", value4}}, {}, {});

auto actual = GetState(storage, 1, "graph1", CheckpointId3);
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(expected, actual));
CheckEquals(expected, actual);
}

};
Expand Down
Loading

0 comments on commit cfac4a0

Please sign in to comment.