Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix followers not reading system tables and not processing requests #4314

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion ydb/core/tablet_flat/flat_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/core/util/pb.h>
#include <ydb/core/scheme_types/scheme_type_registry.h>
#include <util/generic/cast.h>
#include <util/stream/output.h>


#define MAX_REDO_BYTES_PER_COMMIT 268435456U // 256MB
Expand All @@ -21,6 +22,26 @@
namespace NKikimr {
namespace NTable {

bool TDatabase::TChangeCounter::operator<(const TChangeCounter& rhs) const {
if (Serial && rhs.Serial) {
// When both counters have serial they can be compared directly
return Serial < rhs.Serial;
}

if (Epoch == rhs.Epoch) {
// When this counter is (0, epoch) but rhs is (non-zero, epoch), it
// indicates rhs may have more changes. When serial is zero it means
// the current memtable is empty, but rhs epoch is the same, so it
// cannot have fewer changes.
return Serial < rhs.Serial;
}

// The one with the smaller epoch must have fewer changes. In the worst
// case that change may have been a flush (incrementing epoch and serial)
// and then compact (possibly resetting serial to zero).
return Epoch < rhs.Epoch;
}

TDatabase::TDatabase(TDatabaseImpl *databaseImpl) noexcept
: DatabaseImpl(databaseImpl ? databaseImpl : new TDatabaseImpl(0, new TScheme, nullptr))
, NoMoreReadsFlag(true)
Expand Down Expand Up @@ -500,7 +521,7 @@ void TDatabase::SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr)
Require(table)->SetTableObserver(std::move(ptr));
}

TDatabase::TChg TDatabase::Head(ui32 table) const noexcept
TDatabase::TChangeCounter TDatabase::Head(ui32 table) const noexcept
{
if (table == Max<ui32>()) {
return { DatabaseImpl->Serial(), TEpoch::Max() };
Expand Down Expand Up @@ -844,3 +865,11 @@ void DebugDumpDb(const TDatabase &db) {
}

}}

Y_DECLARE_OUT_SPEC(, NKikimr::NTable::TDatabase::TChangeCounter, stream, value) {
stream << "TChangeCounter{serial=";
stream << value.Serial;
stream << ", epoch=";
stream << value.Epoch;
stream << "}";
}
45 changes: 35 additions & 10 deletions ydb/core/tablet_flat/flat_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,37 @@ class TDatabase {
TVector<std::function<void()>> OnPersistent;
};

struct TChg {
ui64 Serial;
TEpoch Epoch;
struct TChangeCounter {
/**
* Monotonic change counter for a table or an entire database. Serial
* is incremented and persisted on each successful Commit() that has
* data changes (i.e. not empty). Note: this may or may not be zero
* when table has no changes, or when all changes have been compacted.
*/
ui64 Serial = 0;

/**
* Monotonic epoch of a table's current memtable. This is incremented
* each time a memtable is flushed and a new one is started. The
* current memtable may or may not have additional changes.
*/
TEpoch Epoch = TEpoch::Zero();

TChangeCounter() = default;

TChangeCounter(ui64 serial, TEpoch epoch)
: Serial(serial)
, Epoch(epoch)
{}

bool operator==(const TChangeCounter& rhs) const = default;
bool operator!=(const TChangeCounter& rhs) const = default;

/**
* Compares two change counters, such that when a < b then b either
* has more changes than a, or it's impossible to determine.
*/
bool operator<(const TChangeCounter& rhs) const;
};

TDatabase(const TDatabase&) = delete;
Expand All @@ -60,14 +88,11 @@ class TDatabase {

void SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr) noexcept;

/* Returns durable monotonic change number for table or entire database
on default (table = Max<ui32>()). Serial is incremented for each
successful Commit(). AHTUNG: Serial may go to the past in case of
migration to older db versions with (Evolution < 18). Thus do not
rely on durability until of kikimr stable 18-08.
/**
* Returns durable monotonic change counter for a table (or a database when
* table = Max<ui32>() by default).
*/

TChg Head(ui32 table = Max<ui32>()) const noexcept;
TChangeCounter Head(ui32 table = Max<ui32>()) const noexcept;

/*_ Call Next() before accessing each row including the 1st row. */

Expand Down
28 changes: 12 additions & 16 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,10 +694,12 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
Y_ABORT_UNLESS(userTablesSchema, "UserTables");

// Check if tables changed since last time we synchronized them
ui64 lastSysUpdate = txc.DB.Head(Schema::Sys::TableId).Serial;
ui64 lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId).Serial;
ui64 lastSnapshotsUpdate = scheme.GetTableInfo(Schema::Snapshots::TableId)
? txc.DB.Head(Schema::Snapshots::TableId).Serial : 0;
NTable::TDatabase::TChangeCounter lastSysUpdate = txc.DB.Head(Schema::Sys::TableId);
NTable::TDatabase::TChangeCounter lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId);
NTable::TDatabase::TChangeCounter lastSnapshotsUpdate;
if (scheme.GetTableInfo(Schema::Snapshots::TableId)) {
lastSnapshotsUpdate = txc.DB.Head(Schema::Snapshots::TableId);
}

NIceDb::TNiceDb db(txc.DB);

Expand Down Expand Up @@ -733,10 +735,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
if (FollowerState.LastSysUpdate < lastSysUpdate) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Updating sys metadata on follower, tabletId " << TabletID()
<< " prevGen " << (FollowerState.LastSysUpdate >> 32)
<< " prevStep " << (FollowerState.LastSysUpdate & (ui32)-1)
<< " newGen " << (lastSysUpdate >> 32)
<< " newStep " << (lastSysUpdate & (ui32)-1));
<< " prev " << FollowerState.LastSysUpdate
<< " current " << lastSysUpdate);

bool ready = true;
ready &= SysGetUi64(db, Schema::Sys_PathOwnerId, PathOwnerId);
Expand All @@ -752,10 +752,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
if (FollowerState.LastSchemeUpdate < lastSchemeUpdate) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Updating tables metadata on follower, tabletId " << TabletID()
<< " prevGen " << (FollowerState.LastSchemeUpdate >> 32)
<< " prevStep " << (FollowerState.LastSchemeUpdate & (ui32)-1)
<< " newGen " << (lastSchemeUpdate >> 32)
<< " newStep " << (lastSchemeUpdate & (ui32)-1));
<< " prev " << FollowerState.LastSchemeUpdate
<< " current " << lastSchemeUpdate);

struct TRow {
TPathId TableId;
Expand Down Expand Up @@ -825,10 +823,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
if (FollowerState.LastSnapshotsUpdate < lastSnapshotsUpdate) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Updating snapshots metadata on follower, tabletId " << TabletID()
<< " prevGen " << (FollowerState.LastSnapshotsUpdate >> 32)
<< " prevStep " << (FollowerState.LastSnapshotsUpdate & (ui32)-1)
<< " newGen " << (lastSnapshotsUpdate >> 32)
<< " newStep " << (lastSnapshotsUpdate & (ui32)-1));
<< " prev " << FollowerState.LastSnapshotsUpdate
<< " current " << lastSnapshotsUpdate);

NIceDb::TNiceDb db(txc.DB);
if (!SnapshotManager.ReloadSnapshots(db)) {
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/tx/datashard/datashard__stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,7 @@ class TDataShard::TTxInitiateStatsUpdate : public NTabletFlatExecutor::TTransact
void CheckIdleMemCompaction(const TUserTable& table, TTransactionContext& txc, const TActorContext& ctx) {
// Note: we only care about changes in the main table
auto lastTableChange = txc.DB.Head(table.LocalTid);
if (table.LastTableChange.Serial != lastTableChange.Serial ||
table.LastTableChange.Epoch != lastTableChange.Epoch)
{
if (table.LastTableChange != lastTableChange) {
table.LastTableChange = lastTableChange;
table.LastTableChangeTimestamp = ctx.Monotonic();
return;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2395,9 +2395,9 @@ class TDataShard

// For follower only
struct TFollowerState {
ui64 LastSysUpdate = 0;
ui64 LastSchemeUpdate = 0;
ui64 LastSnapshotsUpdate = 0;
NTable::TDatabase::TChangeCounter LastSysUpdate;
NTable::TDatabase::TChangeCounter LastSchemeUpdate;
NTable::TDatabase::TChangeCounter LastSnapshotsUpdate;
};

//
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_user_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ struct TUserTable : public TThrRefBase {
mutable TStats Stats;
mutable bool StatsUpdateInProgress = false;
mutable bool StatsNeedUpdate = true;
mutable NTable::TDatabase::TChg LastTableChange{ 0, NTable::TEpoch::Zero() };
mutable NTable::TDatabase::TChangeCounter LastTableChange;
mutable TMonotonic LastTableChangeTimestamp;

ui32 SpecialColTablet = Max<ui32>();
Expand Down
96 changes: 96 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_followers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include "datashard_ut_common_kqp.h"
#include "datashard_ut_read_table.h"

#include <ydb/library/actors/core/mon.h>

namespace NKikimr {

using namespace NKikimr::NDataShard;
Expand Down Expand Up @@ -162,6 +164,100 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) {
}
}

Y_UNIT_TEST(FollowerRebootAfterSysCompaction) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableForceFollowers(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1",
TShardedTableOptions()
.Shards(2)
.Followers(1));

const auto shards = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);");

// Wait for leader to promote the follower read edge (and stop writing to the Sys table)
Cerr << "... sleeping" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime,
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
"/Root"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");

// Now we ask the leader to compact the Sys table
{
NActorsProto::TRemoteHttpInfo pb;
pb.SetMethod(HTTP_METHOD_GET);
pb.SetPath("/executorInternals");
auto* p1 = pb.AddQueryParams();
p1->SetKey("force_compaction");
p1->SetValue("1");
SendViaPipeCache(runtime, shards.at(0), sender,
std::make_unique<NMon::TEvRemoteHttpInfo>(std::move(pb)));
auto ev = runtime.GrabEdgeEventRethrow<NMon::TEvRemoteHttpInfoRes>(sender);
UNIT_ASSERT_C(
ev->Get()->Html.Contains("Table will be compacted in the near future"),
ev->Get()->Html);
}

// Allow table to finish compaction
Cerr << "... sleeping" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// Reboot follower
Cerr << "... killing follower" << Endl;
SendViaPipeCache(runtime, shards.at(0), sender,
std::make_unique<TEvents::TEvPoison>(),
{ .Follower = true });

// Allow it to boot properly
Cerr << "... sleeping" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// Read from follower must succeed
Cerr << "... checking" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime,
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
"/Root"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");

// Update row values and sleep
Cerr << "... updating rows" << Endl;
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);");
runtime.SimulateSleep(TDuration::Seconds(1));

// Read from follower must see updated values
Cerr << "... checking" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime,
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
"/Root"),
"{ items { uint32_value: 1 } items { uint32_value: 44 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 55 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 66 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardFollowers)

} // namespace NKikimr
19 changes: 19 additions & 0 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "datashard_ut_common.h"

#include <ydb/core/base/tablet.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/base/tablet_resolver.h>
#include <ydb/core/scheme/scheme_types_defs.h>
#include <ydb/core/scheme/scheme_types_proto.h>
Expand Down Expand Up @@ -2389,4 +2390,22 @@ TString ReadShardedTable(
return StartReadShardedTable(server, path, snapshot, /* pause = */ false).Result;
}

void SendViaPipeCache(
TTestActorRuntime& runtime,
ui64 tabletId, const TActorId& sender,
std::unique_ptr<IEventBase> msg,
const TSendViaPipeCacheOptions& options)
{
ui32 nodeIndex = sender.NodeId() - runtime.GetNodeId(0);
runtime.Send(
new IEventHandle(
MakePipePeNodeCacheID(options.Follower),
sender,
new TEvPipeCache::TEvForward(msg.release(), tabletId, options.Subscribe),
options.Flags,
options.Cookie),
nodeIndex,
/* viaActorSystem */ true);
}

}
13 changes: 13 additions & 0 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -819,4 +819,17 @@ void WaitFor(TTestActorRuntime& runtime, TCondition&& condition, const TString&
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
}

struct TSendViaPipeCacheOptions {
ui32 Flags = 0;
ui64 Cookie = 0;
bool Follower = false;
bool Subscribe = false;
};

void SendViaPipeCache(
TTestActorRuntime& runtime,
ui64 tabletId, const TActorId& sender,
std::unique_ptr<IEventBase> msg,
const TSendViaPipeCacheOptions& options = {});

}
Loading