Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIKIMR-20150: fix and improve flaky test #713

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 106 additions & 8 deletions ydb/core/client/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2139,7 +2139,8 @@ Y_UNIT_TEST_SUITE(TClientTest) {
TPortManager tp;
ui16 port = tp.GetPort(2134);

const auto settings = TServerSettings(port);
const auto settings = TServerSettings(port)
.SetUseRealThreads(false);
TServer server(settings);
TClient client(settings);
SetupLogging(server);
Expand All @@ -2153,32 +2154,129 @@ Y_UNIT_TEST_SUITE(TClientTest) {
const TActorId edge = runtime.AllocateEdgeActor();

{
ui64 confirmationsCount = 0;
auto observeConfirmations = [&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvBlobStorage::TEvPut::EventType: {
const auto* msg = ev->Get<TEvBlobStorage::TEvPut>();
// step 1 is snapshot
// step 2 is schema alter
// step 3 is expected write below
if (msg->Id.TabletID() == tabletId &&
msg->Id.Channel() == 0 &&
msg->Id.Cookie() == 1 &&
msg->Id.Step() > 2)
{
++confirmationsCount;
}
break;
}
}
return TTestActorRuntime::EEventAction::PROCESS;
};
runtime.SetObserverFunc(observeConfirmations);

const TActorId leaderTablet = runtime.Register(CreateTablet(edge, tabletInfo.Get(), setupInfo.Get(), 0, nullptr, nullptr));
const TActorId leaderId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(edge)->Get()->UserTabletActor;
Y_UNUSED(leaderId);

// we use it to kill leader only when it has sent the write to the follower and it is confirmed
const TActorId followerTablet = runtime.Register(CreateTabletFollower(edge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));

auto doLeaderWrite = [&](ui64 key, ui64 value) {
const char *writeQuery = R"__((
(let row_ '('('key (Uint64 '%lu))))
(let update_ '('('v_ui64 (Uint64 '%lu))))
(let result_ (UpdateRow 't_by_ui64 row_ update_))
(return (AsList result_))
))__";

THolder<TEvTablet::TEvLocalMKQL> reqWrite = MakeHolder<TEvTablet::TEvLocalMKQL>();
reqWrite->Record.MutableProgram()->MutableProgram()->SetText(Sprintf(writeQuery, key, value));
runtime.Send(new IEventHandle(leaderId, edge, reqWrite.Release()));

auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvLocalMKQLResponse>(edge);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetStatus(), 0);
};

doLeaderWrite(42, 51);

auto waitFor = [&](const auto& condition, const TString& description) {
if (!condition()) {
Cerr << "... waiting for " << description << Endl;
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return condition();
};
runtime.DispatchEvents(options);
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
}
};

waitFor([&](){ return confirmationsCount > 0; }, "Write confirmed");

runtime.Send(new IEventHandle(leaderTablet, edge, new TEvents::TEvPoisonPill()));
auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvTabletDead>(edge);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->TabletID, tabletId);

runtime.Send(new IEventHandle(followerTablet, edge, new TEvents::TEvPoisonPill()));
reply = runtime.GrabEdgeEvent<TEvTablet::TEvTabletDead>(edge);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->TabletID, tabletId);
}

const TActorId followerTablet = runtime.Register(CreateTabletFollower(edge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));
Y_UNUSED(followerTablet);
// now we start follower without its leader

const TActorId followerId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(edge)->Get()->UserTabletActor;
Y_UNUSED(followerId);
const TActorId followerEdge = runtime.AllocateEdgeActor();
const TActorId followerTablet = runtime.Register(CreateTabletFollower(followerEdge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));
Y_UNUSED(followerTablet);
const TActorId followerId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(followerEdge)->Get()->UserTabletActor;

{
NTabletPipe::TClientConfig pipeClientConfig;
pipeClientConfig.AllowFollower = true;
pipeClientConfig.ForceFollower = true;
pipeClientConfig.RetryPolicy = {.RetryLimitCount = 2};
runtime.Register(NTabletPipe::CreateClient(edge, tabletId, pipeClientConfig));
runtime.Register(NTabletPipe::CreateClient(followerEdge, tabletId, pipeClientConfig));

auto reply = runtime.GrabEdgeEvent<TEvTabletPipe::TEvClientConnected>(edge);
auto reply = runtime.GrabEdgeEvent<TEvTabletPipe::TEvClientConnected>(followerEdge);

UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Status, NKikimrProto::OK);
}

auto doFollowerRead = [&](ui64 key) -> TMaybe<ui64> {
const char *readQuery = R"__((
(let row_ '('('key (Uint64 '%lu))))
(let select_ '('v_ui64))
(let pgmReturn (AsList
(SetResult 'res (SelectRow 't_by_ui64 row_ select_))
))
(return pgmReturn)
))__";

THolder<TEvTablet::TEvLocalMKQL> reqRead = MakeHolder<TEvTablet::TEvLocalMKQL>();
reqRead->Record.MutableProgram()->MutableProgram()->SetText(Sprintf(readQuery, key));
runtime.Send(new IEventHandle(followerId, followerEdge, reqRead.Release()));

auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvLocalMKQLResponse>(followerEdge);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetStatus(), 0);
const auto res = reply->Get()->Record
.GetExecutionEngineEvaluatedResponse()
.GetValue()
.GetStruct(0)
.GetOptional();
if (!res.HasOptional()) {
return Nothing();
}

return res
.GetOptional()
.GetStruct(0)
.GetOptional()
.GetUint64();
};

// Perform basic sanity checks
UNIT_ASSERT_VALUES_EQUAL(doFollowerRead(41), Nothing());
UNIT_ASSERT_VALUES_EQUAL(doFollowerRead(42), 51u);
}

Y_UNIT_TEST(FollowerOfflineBoot) {
Expand Down
Loading