Skip to content

Commit

Permalink
Merge 74d8ac1 into d32d5d0
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jul 10, 2024
2 parents d32d5d0 + 74d8ac1 commit d3d5236
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 57 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,5 @@ enum ETxTypes {
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
}
148 changes: 116 additions & 32 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,28 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));

auto res = ChangesQueue.emplace(record.GetOrder(), record);
Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder());

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

db.GetDatabase().OnRollback([this, order = record.GetOrder()] {
auto it = ChangesQueue.find(order);
Y_VERIFY_S(it != ChangesQueue.end(), "Cannot find change record: " << order);

if (it->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(it->second.TableId, it->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(it);
});
} else {
auto& state = LockChangeRecords[lockId];
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
Expand Down Expand Up @@ -934,6 +956,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
committed.Step = rowVersion.Step;
committed.TxId = rowVersion.TxId;
collected.push_back(committed);

auto res = ChangesQueue.emplace(committed.Order, committed);
Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once");
Expand All @@ -960,7 +990,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
LockChangeRecords.erase(it);
});
db.GetDatabase().OnRollback([this, lockId]() {
CommittedLockChangeRecords.erase(lockId);
auto it = CommittedLockChangeRecords.find(lockId);
Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId);

for (size_t i = 0; i < it->second.Count; ++i) {
const ui64 order = it->second.Order + i;

auto cIt = ChangesQueue.find(order);
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);

if (cIt->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(cIt);
}

CommittedLockChangeRecords.erase(it);
});
}

Expand Down Expand Up @@ -1022,23 +1071,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
ChangesQueueBytes -= record.BodySize;

if (record.SchemaSnapshotAcquired) {
Y_ABORT_UNLESS(record.TableId);
auto tableIt = TableInfos.find(record.TableId.LocalPathId);

if (tableIt != TableInfos.end()) {
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);

if (last) {
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
Y_ABORT_UNLESS(snapshot);

if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
}
}
} else {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

Expand Down Expand Up @@ -1081,22 +1116,15 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
for (const auto& record : records) {
forward.emplace_back(record.Order, record.PathId, record.BodySize);

auto res = ChangesQueue.emplace(
std::piecewise_construct,
std::forward_as_tuple(record.Order),
std::forward_as_tuple(record, now, cookie)
);
if (res.second) {
ChangesList.PushBack(&res.first->second);
auto it = ChangesQueue.find(record.Order);
Y_ABORT_UNLESS(it != ChangesQueue.end());

Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
it->second.EnqueuedAt = now;
it->second.ReservationCookie = cookie;
ChangesList.PushBack(&it->second);

if (record.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
}
}
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
}

if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
Expand Down Expand Up @@ -1265,6 +1293,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
.SchemaVersion = schemaVersion,
});

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

if (!rowset.Next()) {
return false;
}
Expand Down Expand Up @@ -1363,6 +1399,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
});
entry.Count++;
needSort = true;

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

LockChangeRecords.erase(lockId);
Expand Down Expand Up @@ -1421,6 +1465,46 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
}
}

void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) {
Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key));

const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key);
Y_ABORT_UNLESS(snapshot);

auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
return;
}

if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
bool wasEmpty = PendingSchemaSnapshotsToRemove.empty();
PendingSchemaSnapshotsToRemove.push_back(key);
if (wasEmpty) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}
}

void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() {
bool wasEmpty = PendingSchemaSnapshotsToRemove.empty();

for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) {
auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
break;
}
if (snapshot.Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
PendingSchemaSnapshotsToRemove.push_back(key);
}
}

if (wasEmpty && !PendingSchemaSnapshotsToRemove.empty()) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}

void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
return false;
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) {
if (!Self->LoadChangeRecords(db, ChangeRecords)) {
return false;
Expand Down Expand Up @@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
TDataShardLocksDb locksDb(*Self, txc);
if (!Self->SysLocks.Load(locksDb)) {
Expand Down Expand Up @@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
Self->SubscribeNewLocks();

Self->ScheduleRemoveAbandonedLockChanges();
Self->ScheduleRemoveAbandonedSchemaSnapshots();

return true;
}
Expand Down
14 changes: 6 additions & 8 deletions ydb/core/tx/datashard/datashard_change_sending.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,13 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
Self->RemoveChangeRecordsInFly = false;
}

if (!Self->ChangesQueue) { // double check queue
if (ChangeExchangeSplit) {
Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
}
if (ChangeExchangeSplit) {
Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
}

for (const auto dstTabletId : ActivationList) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
for (const auto dstTabletId : ActivationList) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}

Self->CheckStateChange(ctx);
Expand Down
32 changes: 24 additions & 8 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class TDataShard
class TTxCdcStreamScanProgress;
class TTxCdcStreamEmitHeartbeats;
class TTxUpdateFollowerReadEdge;
class TTxRemoveSchemaSnapshots;

template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
Expand Down Expand Up @@ -374,6 +375,7 @@ class TDataShard
EvPlanPredictedTxs,
EvStatisticsScanFinished,
EvTableStatsError,
EvRemoveSchemaSnapshots,
EvEnd
};

Expand Down Expand Up @@ -595,6 +597,8 @@ class TDataShard
struct TEvPlanPredictedTxs : public TEventLocal<TEvPlanPredictedTxs, EvPlanPredictedTxs> {};

struct TEvStatisticsScanFinished : public TEventLocal<TEvStatisticsScanFinished, EvStatisticsScanFinished> {};

struct TEvRemoveSchemaSnapshots : public TEventLocal<TEvRemoveSchemaSnapshots, EvRemoveSchemaSnapshots> {};
};

struct Schema : NIceDb::Schema {
Expand Down Expand Up @@ -1383,6 +1387,8 @@ class TDataShard

void Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr& ev, const TActorContext& ctx);

void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);

void DoPeriodicTasks(const TActorContext &ctx);
Expand Down Expand Up @@ -1920,6 +1926,8 @@ class TDataShard
bool LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShardChangeCollector::TChange>& records);
void ScheduleRemoveLockChanges(ui64 lockId);
void ScheduleRemoveAbandonedLockChanges();
void ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key);
void ScheduleRemoveAbandonedSchemaSnapshots();

static void PersistCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value,
const TPathId& tablePathId, const TPathId& streamPathId);
Expand Down Expand Up @@ -2803,24 +2811,29 @@ class TDataShard
ui64 LockOffset;
ui64 ReservationCookie;

explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId,
ui64 schemaVersion, TInstant created, TInstant enqueued,
ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0)
explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion,
TInstant created, ui64 lockId = 0, ui64 lockOffset = 0)
: BodySize(bodySize)
, TableId(tableId)
, SchemaVersion(schemaVersion)
, SchemaSnapshotAcquired(false)
, CreatedAt(created)
, EnqueuedAt(enqueued)
, EnqueuedAt(TInstant::Zero())
, LockId(lockId)
, LockOffset(lockOffset)
, ReservationCookie(cookie)
, ReservationCookie(0)
{
}

explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record)
: TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion,
record.CreatedAt(), record.LockId, record.LockOffset)
{
}

explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie)
: TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now,
record.LockId, record.LockOffset, cookie)
explicit TEnqueuedRecord(const TChangeRecord& record)
: TEnqueuedRecord(record.GetBody().size(), record.GetTableId(), record.GetSchemaVersion(),
record.GetApproximateCreationDateTime(), record.GetLockId(), record.GetLockOffset())
{
}
};
Expand Down Expand Up @@ -2865,6 +2878,7 @@ class TDataShard
THashMap<ui64, TUncommittedLockChangeRecords> LockChangeRecords; // ui64 is lock id
THashMap<ui64, TCommittedLockChangeRecords> CommittedLockChangeRecords; // ui64 is lock id
TVector<ui64> PendingLockChangeRecordsToRemove;
TVector<TSchemaSnapshotKey> PendingSchemaSnapshotsToRemove;

// in
THashMap<ui64, TInChangeSender> InChangeSenders; // ui64 is shard id
Expand Down Expand Up @@ -2985,6 +2999,7 @@ class TDataShard
HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle);
HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle);
HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateInactive unhandled event type: " << ev->GetTypeRewrite()
Expand Down Expand Up @@ -3113,6 +3128,7 @@ class TDataShard
HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle);
HFunc(NStat::TEvStatistics::TEvStatisticsRequest, Handle);
HFunc(TEvPrivate::TEvStatisticsScanFinished, Handle);
HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString());
Expand Down
Loading

0 comments on commit d3d5236

Please sign in to comment.