Skip to content

Commit

Permalink
KIKIMR-20150: fix and improve flaky test (ydb-platform#713)
Browse files Browse the repository at this point in the history
  • Loading branch information
eivanov89 authored and adameat committed Dec 29, 2023
1 parent c352cb4 commit 491dd86
Showing 1 changed file with 106 additions and 8 deletions.
114 changes: 106 additions & 8 deletions ydb/core/client/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2139,7 +2139,8 @@ Y_UNIT_TEST_SUITE(TClientTest) {
TPortManager tp;
ui16 port = tp.GetPort(2134);

const auto settings = TServerSettings(port);
const auto settings = TServerSettings(port)
.SetUseRealThreads(false);
TServer server(settings);
TClient client(settings);
SetupLogging(server);
Expand All @@ -2153,32 +2154,129 @@ Y_UNIT_TEST_SUITE(TClientTest) {
const TActorId edge = runtime.AllocateEdgeActor();

{
ui64 confirmationsCount = 0;
auto observeConfirmations = [&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvBlobStorage::TEvPut::EventType: {
const auto* msg = ev->Get<TEvBlobStorage::TEvPut>();
// step 1 is snapshot
// step 2 is schema alter
// step 3 is expected write below
if (msg->Id.TabletID() == tabletId &&
msg->Id.Channel() == 0 &&
msg->Id.Cookie() == 1 &&
msg->Id.Step() > 2)
{
++confirmationsCount;
}
break;
}
}
return TTestActorRuntime::EEventAction::PROCESS;
};
runtime.SetObserverFunc(observeConfirmations);

const TActorId leaderTablet = runtime.Register(CreateTablet(edge, tabletInfo.Get(), setupInfo.Get(), 0, nullptr, nullptr));
const TActorId leaderId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(edge)->Get()->UserTabletActor;
Y_UNUSED(leaderId);

// we use it to kill leader only when it has sent the write to the follower and it is confirmed
const TActorId followerTablet = runtime.Register(CreateTabletFollower(edge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));

auto doLeaderWrite = [&](ui64 key, ui64 value) {
const char *writeQuery = R"__((
(let row_ '('('key (Uint64 '%lu))))
(let update_ '('('v_ui64 (Uint64 '%lu))))
(let result_ (UpdateRow 't_by_ui64 row_ update_))
(return (AsList result_))
))__";

THolder<TEvTablet::TEvLocalMKQL> reqWrite = MakeHolder<TEvTablet::TEvLocalMKQL>();
reqWrite->Record.MutableProgram()->MutableProgram()->SetText(Sprintf(writeQuery, key, value));
runtime.Send(new IEventHandle(leaderId, edge, reqWrite.Release()));

auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvLocalMKQLResponse>(edge);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetStatus(), 0);
};

doLeaderWrite(42, 51);

auto waitFor = [&](const auto& condition, const TString& description) {
if (!condition()) {
Cerr << "... waiting for " << description << Endl;
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return condition();
};
runtime.DispatchEvents(options);
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
}
};

waitFor([&](){ return confirmationsCount > 0; }, "Write confirmed");

runtime.Send(new IEventHandle(leaderTablet, edge, new TEvents::TEvPoisonPill()));
auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvTabletDead>(edge);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->TabletID, tabletId);

runtime.Send(new IEventHandle(followerTablet, edge, new TEvents::TEvPoisonPill()));
reply = runtime.GrabEdgeEvent<TEvTablet::TEvTabletDead>(edge);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->TabletID, tabletId);
}

const TActorId followerTablet = runtime.Register(CreateTabletFollower(edge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));
Y_UNUSED(followerTablet);
// now we start follower without its leader

const TActorId followerId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(edge)->Get()->UserTabletActor;
Y_UNUSED(followerId);
const TActorId followerEdge = runtime.AllocateEdgeActor();
const TActorId followerTablet = runtime.Register(CreateTabletFollower(followerEdge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));
Y_UNUSED(followerTablet);
const TActorId followerId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(followerEdge)->Get()->UserTabletActor;

{
NTabletPipe::TClientConfig pipeClientConfig;
pipeClientConfig.AllowFollower = true;
pipeClientConfig.ForceFollower = true;
pipeClientConfig.RetryPolicy = {.RetryLimitCount = 2};
runtime.Register(NTabletPipe::CreateClient(edge, tabletId, pipeClientConfig));
runtime.Register(NTabletPipe::CreateClient(followerEdge, tabletId, pipeClientConfig));

auto reply = runtime.GrabEdgeEvent<TEvTabletPipe::TEvClientConnected>(edge);
auto reply = runtime.GrabEdgeEvent<TEvTabletPipe::TEvClientConnected>(followerEdge);

UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Status, NKikimrProto::OK);
}

auto doFollowerRead = [&](ui64 key) -> TMaybe<ui64> {
const char *readQuery = R"__((
(let row_ '('('key (Uint64 '%lu))))
(let select_ '('v_ui64))
(let pgmReturn (AsList
(SetResult 'res (SelectRow 't_by_ui64 row_ select_))
))
(return pgmReturn)
))__";

THolder<TEvTablet::TEvLocalMKQL> reqRead = MakeHolder<TEvTablet::TEvLocalMKQL>();
reqRead->Record.MutableProgram()->MutableProgram()->SetText(Sprintf(readQuery, key));
runtime.Send(new IEventHandle(followerId, followerEdge, reqRead.Release()));

auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvLocalMKQLResponse>(followerEdge);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetStatus(), 0);
const auto res = reply->Get()->Record
.GetExecutionEngineEvaluatedResponse()
.GetValue()
.GetStruct(0)
.GetOptional();
if (!res.HasOptional()) {
return Nothing();
}

return res
.GetOptional()
.GetStruct(0)
.GetOptional()
.GetUint64();
};

// Perform basic sanity checks
UNIT_ASSERT_VALUES_EQUAL(doFollowerRead(41), Nothing());
UNIT_ASSERT_VALUES_EQUAL(doFollowerRead(42), 51u);
}

Y_UNIT_TEST(FollowerOfflineBoot) {
Expand Down

0 comments on commit 491dd86

Please sign in to comment.