Skip to content

Commit

Permalink
Fix followers not reading system tables and not processing requests (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored May 7, 2024
1 parent 47b04b7 commit 971d7f8
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 34 deletions.
31 changes: 30 additions & 1 deletion ydb/core/tablet_flat/flat_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/core/util/pb.h>
#include <ydb/core/scheme_types/scheme_type_registry.h>
#include <util/generic/cast.h>
#include <util/stream/output.h>


#define MAX_REDO_BYTES_PER_COMMIT 268435456U // 256MB
Expand All @@ -21,6 +22,26 @@
namespace NKikimr {
namespace NTable {

bool TDatabase::TChangeCounter::operator<(const TChangeCounter& rhs) const {
if (Serial && rhs.Serial) {
// When both counters have serial they can be compared directly
return Serial < rhs.Serial;
}

if (Epoch == rhs.Epoch) {
// When this counter is (0, epoch) but rhs is (non-zero, epoch), it
// indicates rhs may have more changes. When serial is zero it means
// the current memtable is empty, but rhs epoch is the same, so it
// cannot have fewer changes.
return Serial < rhs.Serial;
}

// The one with the smaller epoch must have fewer changes. In the worst
// case that change may have been a flush (incrementing epoch and serial)
// and then compact (possibly resetting serial to zero).
return Epoch < rhs.Epoch;
}

TDatabase::TDatabase(TDatabaseImpl *databaseImpl) noexcept
: DatabaseImpl(databaseImpl ? databaseImpl : new TDatabaseImpl(0, new TScheme, nullptr))
, NoMoreReadsFlag(true)
Expand Down Expand Up @@ -500,7 +521,7 @@ void TDatabase::SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr)
Require(table)->SetTableObserver(std::move(ptr));
}

TDatabase::TChg TDatabase::Head(ui32 table) const noexcept
TDatabase::TChangeCounter TDatabase::Head(ui32 table) const noexcept
{
if (table == Max<ui32>()) {
return { DatabaseImpl->Serial(), TEpoch::Max() };
Expand Down Expand Up @@ -844,3 +865,11 @@ void DebugDumpDb(const TDatabase &db) {
}

}}

Y_DECLARE_OUT_SPEC(, NKikimr::NTable::TDatabase::TChangeCounter, stream, value) {
stream << "TChangeCounter{serial=";
stream << value.Serial;
stream << ", epoch=";
stream << value.Epoch;
stream << "}";
}
45 changes: 35 additions & 10 deletions ydb/core/tablet_flat/flat_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,37 @@ class TDatabase {
TVector<std::function<void()>> OnPersistent;
};

struct TChg {
ui64 Serial;
TEpoch Epoch;
struct TChangeCounter {
/**
* Monotonic change counter for a table or an entire database. Serial
* is incremented and persisted on each successful Commit() that has
* data changes (i.e. not empty). Note: this may or may not be zero
* when table has no changes, or when all changes have been compacted.
*/
ui64 Serial = 0;

/**
* Monotonic epoch of a table's current memtable. This is incremented
* each time a memtable is flushed and a new one is started. The
* current memtable may or may not have additional changes.
*/
TEpoch Epoch = TEpoch::Zero();

TChangeCounter() = default;

TChangeCounter(ui64 serial, TEpoch epoch)
: Serial(serial)
, Epoch(epoch)
{}

bool operator==(const TChangeCounter& rhs) const = default;
bool operator!=(const TChangeCounter& rhs) const = default;

/**
* Compares two change counters, such that when a < b then b either
* has more changes than a, or it's impossible to determine.
*/
bool operator<(const TChangeCounter& rhs) const;
};

TDatabase(const TDatabase&) = delete;
Expand All @@ -60,14 +88,11 @@ class TDatabase {

void SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr) noexcept;

/* Returns durable monotonic change number for table or entire database
on default (table = Max<ui32>()). Serial is incremented for each
successful Commit(). AHTUNG: Serial may go to the past in case of
migration to older db versions with (Evolution < 18). Thus do not
rely on durability until of kikimr stable 18-08.
/**
* Returns durable monotonic change counter for a table (or a database when
* table = Max<ui32>() by default).
*/

TChg Head(ui32 table = Max<ui32>()) const noexcept;
TChangeCounter Head(ui32 table = Max<ui32>()) const noexcept;

/*_ Call Next() before accessing each row including the 1st row. */

Expand Down
28 changes: 12 additions & 16 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,10 +694,12 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
Y_ABORT_UNLESS(userTablesSchema, "UserTables");

// Check if tables changed since last time we synchronized them
ui64 lastSysUpdate = txc.DB.Head(Schema::Sys::TableId).Serial;
ui64 lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId).Serial;
ui64 lastSnapshotsUpdate = scheme.GetTableInfo(Schema::Snapshots::TableId)
? txc.DB.Head(Schema::Snapshots::TableId).Serial : 0;
NTable::TDatabase::TChangeCounter lastSysUpdate = txc.DB.Head(Schema::Sys::TableId);
NTable::TDatabase::TChangeCounter lastSchemeUpdate = txc.DB.Head(Schema::UserTables::TableId);
NTable::TDatabase::TChangeCounter lastSnapshotsUpdate;
if (scheme.GetTableInfo(Schema::Snapshots::TableId)) {
lastSnapshotsUpdate = txc.DB.Head(Schema::Snapshots::TableId);
}

NIceDb::TNiceDb db(txc.DB);

Expand Down Expand Up @@ -733,10 +735,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
if (FollowerState.LastSysUpdate < lastSysUpdate) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Updating sys metadata on follower, tabletId " << TabletID()
<< " prevGen " << (FollowerState.LastSysUpdate >> 32)
<< " prevStep " << (FollowerState.LastSysUpdate & (ui32)-1)
<< " newGen " << (lastSysUpdate >> 32)
<< " newStep " << (lastSysUpdate & (ui32)-1));
<< " prev " << FollowerState.LastSysUpdate
<< " current " << lastSysUpdate);

bool ready = true;
ready &= SysGetUi64(db, Schema::Sys_PathOwnerId, PathOwnerId);
Expand All @@ -752,10 +752,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
if (FollowerState.LastSchemeUpdate < lastSchemeUpdate) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Updating tables metadata on follower, tabletId " << TabletID()
<< " prevGen " << (FollowerState.LastSchemeUpdate >> 32)
<< " prevStep " << (FollowerState.LastSchemeUpdate & (ui32)-1)
<< " newGen " << (lastSchemeUpdate >> 32)
<< " newStep " << (lastSchemeUpdate & (ui32)-1));
<< " prev " << FollowerState.LastSchemeUpdate
<< " current " << lastSchemeUpdate);

struct TRow {
TPathId TableId;
Expand Down Expand Up @@ -825,10 +823,8 @@ bool TDataShard::SyncSchemeOnFollower(TTransactionContext &txc, const TActorCont
if (FollowerState.LastSnapshotsUpdate < lastSnapshotsUpdate) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Updating snapshots metadata on follower, tabletId " << TabletID()
<< " prevGen " << (FollowerState.LastSnapshotsUpdate >> 32)
<< " prevStep " << (FollowerState.LastSnapshotsUpdate & (ui32)-1)
<< " newGen " << (lastSnapshotsUpdate >> 32)
<< " newStep " << (lastSnapshotsUpdate & (ui32)-1));
<< " prev " << FollowerState.LastSnapshotsUpdate
<< " current " << lastSnapshotsUpdate);

NIceDb::TNiceDb db(txc.DB);
if (!SnapshotManager.ReloadSnapshots(db)) {
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/tx/datashard/datashard__stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,7 @@ class TDataShard::TTxInitiateStatsUpdate : public NTabletFlatExecutor::TTransact
void CheckIdleMemCompaction(const TUserTable& table, TTransactionContext& txc, const TActorContext& ctx) {
// Note: we only care about changes in the main table
auto lastTableChange = txc.DB.Head(table.LocalTid);
if (table.LastTableChange.Serial != lastTableChange.Serial ||
table.LastTableChange.Epoch != lastTableChange.Epoch)
{
if (table.LastTableChange != lastTableChange) {
table.LastTableChange = lastTableChange;
table.LastTableChangeTimestamp = ctx.Monotonic();
return;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2420,9 +2420,9 @@ class TDataShard

// For follower only
struct TFollowerState {
ui64 LastSysUpdate = 0;
ui64 LastSchemeUpdate = 0;
ui64 LastSnapshotsUpdate = 0;
NTable::TDatabase::TChangeCounter LastSysUpdate;
NTable::TDatabase::TChangeCounter LastSchemeUpdate;
NTable::TDatabase::TChangeCounter LastSnapshotsUpdate;
};

//
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_user_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ struct TUserTable : public TThrRefBase {
mutable TStats Stats;
mutable bool StatsUpdateInProgress = false;
mutable bool StatsNeedUpdate = true;
mutable NTable::TDatabase::TChg LastTableChange{ 0, NTable::TEpoch::Zero() };
mutable NTable::TDatabase::TChangeCounter LastTableChange;
mutable TMonotonic LastTableChangeTimestamp;

ui32 SpecialColTablet = Max<ui32>();
Expand Down
96 changes: 96 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_followers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include "datashard_ut_common_kqp.h"
#include "datashard_ut_read_table.h"

#include <ydb/library/actors/core/mon.h>

namespace NKikimr {

using namespace NKikimr::NDataShard;
Expand Down Expand Up @@ -162,6 +164,100 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) {
}
}

Y_UNIT_TEST(FollowerRebootAfterSysCompaction) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableForceFollowers(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1",
TShardedTableOptions()
.Shards(2)
.Followers(1));

const auto shards = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33);");

// Wait for leader to promote the follower read edge (and stop writing to the Sys table)
Cerr << "... sleeping" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime,
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
"/Root"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");

// Now we ask the leader to compact the Sys table
{
NActorsProto::TRemoteHttpInfo pb;
pb.SetMethod(HTTP_METHOD_GET);
pb.SetPath("/executorInternals");
auto* p1 = pb.AddQueryParams();
p1->SetKey("force_compaction");
p1->SetValue("1");
SendViaPipeCache(runtime, shards.at(0), sender,
std::make_unique<NMon::TEvRemoteHttpInfo>(std::move(pb)));
auto ev = runtime.GrabEdgeEventRethrow<NMon::TEvRemoteHttpInfoRes>(sender);
UNIT_ASSERT_C(
ev->Get()->Html.Contains("Table will be compacted in the near future"),
ev->Get()->Html);
}

// Allow table to finish compaction
Cerr << "... sleeping" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// Reboot follower
Cerr << "... killing follower" << Endl;
SendViaPipeCache(runtime, shards.at(0), sender,
std::make_unique<TEvents::TEvPoison>(),
{ .Follower = true });

// Allow it to boot properly
Cerr << "... sleeping" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// Read from follower must succeed
Cerr << "... checking" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime,
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
"/Root"),
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");

// Update row values and sleep
Cerr << "... updating rows" << Endl;
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 44), (2, 55), (3, 66);");
runtime.SimulateSleep(TDuration::Seconds(1));

// Read from follower must see updated values
Cerr << "... checking" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleStaleRoExec(runtime,
"SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3",
"/Root"),
"{ items { uint32_value: 1 } items { uint32_value: 44 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 55 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 66 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardFollowers)

} // namespace NKikimr
19 changes: 19 additions & 0 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "datashard_ut_common.h"

#include <ydb/core/base/tablet.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/base/tablet_resolver.h>
#include <ydb/core/scheme/scheme_types_defs.h>
#include <ydb/core/scheme/scheme_types_proto.h>
Expand Down Expand Up @@ -2389,4 +2390,22 @@ TString ReadShardedTable(
return StartReadShardedTable(server, path, snapshot, /* pause = */ false).Result;
}

void SendViaPipeCache(
TTestActorRuntime& runtime,
ui64 tabletId, const TActorId& sender,
std::unique_ptr<IEventBase> msg,
const TSendViaPipeCacheOptions& options)
{
ui32 nodeIndex = sender.NodeId() - runtime.GetNodeId(0);
runtime.Send(
new IEventHandle(
MakePipePeNodeCacheID(options.Follower),
sender,
new TEvPipeCache::TEvForward(msg.release(), tabletId, options.Subscribe),
options.Flags,
options.Cookie),
nodeIndex,
/* viaActorSystem */ true);
}

}
13 changes: 13 additions & 0 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -819,4 +819,17 @@ void WaitFor(TTestActorRuntime& runtime, TCondition&& condition, const TString&
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
}

struct TSendViaPipeCacheOptions {
ui32 Flags = 0;
ui64 Cookie = 0;
bool Follower = false;
bool Subscribe = false;
};

void SendViaPipeCache(
TTestActorRuntime& runtime,
ui64 tabletId, const TActorId& sender,
std::unique_ptr<IEventBase> msg,
const TSendViaPipeCacheOptions& options = {});

}

0 comments on commit 971d7f8

Please sign in to comment.