Skip to content

Commit

Permalink
Merge 4c1b056 into 0e4b669
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jul 12, 2024
2 parents 0e4b669 + 4c1b056 commit ebbd0c1
Show file tree
Hide file tree
Showing 23 changed files with 662 additions and 150 deletions.
2 changes: 1 addition & 1 deletion ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class TBaseChangeSender {
Y_ABORT_UNLESS(it != Broadcasting.end());

auto& broadcast = it->second;
if (broadcast.Partitions.contains(partitionId)) {
if (broadcast.CompletedPartitions.contains(partitionId)) {
return false;
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() {
}

bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
return !SourceIdWriter.GetSourceIdsToWrite().empty();
return !SourceIdWriter.GetSourceIdsToWrite().empty()
|| !SourceIdWriter.GetSourceIdsToDelete().empty();
}

void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class TSourceIdWriter {
return Registrations;
}

const THashSet<TString>& GetSourceIdsToDelete() const {
return Deregistrations;
}

template <typename... Args>
void RegisterSourceId(const TString& sourceId, Args&&... args) {
Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,5 @@ enum ETxTypes {
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
}
187 changes: 150 additions & 37 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));

auto res = ChangesQueue.emplace(record.GetOrder(), record);
Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder());

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

if (CommittingChangeRecords.empty()) {
db.GetDatabase().OnCommit([this] {
CommittingChangeRecords.clear();
});
db.GetDatabase().OnRollback([this] {
for (const auto order : CommittingChangeRecords) {
auto cIt = ChangesQueue.find(order);
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);

if (cIt->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(cIt);
}

CommittingChangeRecords.clear();
});
}

CommittingChangeRecords.push_back(record.GetOrder());
} else {
auto& state = LockChangeRecords[lockId];
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
Expand Down Expand Up @@ -934,6 +967,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
committed.Step = rowVersion.Step;
committed.TxId = rowVersion.TxId;
collected.push_back(committed);

auto res = ChangesQueue.emplace(committed.Order, committed);
Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once");
Expand All @@ -960,7 +1001,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
LockChangeRecords.erase(it);
});
db.GetDatabase().OnRollback([this, lockId]() {
CommittedLockChangeRecords.erase(lockId);
auto it = CommittedLockChangeRecords.find(lockId);
Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId);

for (size_t i = 0; i < it->second.Count; ++i) {
const ui64 order = it->second.Order + i;

auto cIt = ChangesQueue.find(order);
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);

if (cIt->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(cIt);
}

CommittedLockChangeRecords.erase(it);
});
}

Expand Down Expand Up @@ -994,7 +1054,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {

auto it = ChangesQueue.find(order);
if (it == ChangesQueue.end()) {
Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
return;
}

Expand Down Expand Up @@ -1022,23 +1081,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
ChangesQueueBytes -= record.BodySize;

if (record.SchemaSnapshotAcquired) {
Y_ABORT_UNLESS(record.TableId);
auto tableIt = TableInfos.find(record.TableId.LocalPathId);

if (tableIt != TableInfos.end()) {
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);

if (last) {
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
Y_ABORT_UNLESS(snapshot);

if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
}
}
} else {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

Expand All @@ -1059,7 +1104,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
CheckChangesQueueNoOverflow();
}

void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
if (!records) {
return;
}
Expand All @@ -1079,27 +1124,24 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
const auto now = AppData()->TimeProvider->Now();
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
for (const auto& record : records) {
forward.emplace_back(record.Order, record.PathId, record.BodySize);
auto it = ChangesQueue.find(record.Order);
if (it == ChangesQueue.end()) {
Y_ABORT_UNLESS(afterMove);
continue;
}

auto res = ChangesQueue.emplace(
std::piecewise_construct,
std::forward_as_tuple(record.Order),
std::forward_as_tuple(record, now, cookie)
);
if (res.second) {
ChangesList.PushBack(&res.first->second);
forward.emplace_back(record.Order, record.PathId, record.BodySize);

Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
it->second.EnqueuedAt = now;
it->second.ReservationCookie = cookie;
ChangesList.PushBack(&it->second);

if (record.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
}
}
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
}

if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
Y_ABORT_UNLESS(!afterMove);
ChangeQueueReservedCapacity -= it->second;
ChangeQueueReservedCapacity += records.size();
}
Expand Down Expand Up @@ -1265,6 +1307,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
.SchemaVersion = schemaVersion,
});

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

if (!rowset.Next()) {
return false;
}
Expand Down Expand Up @@ -1363,6 +1413,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
});
entry.Count++;
needSort = true;

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

LockChangeRecords.erase(lockId);
Expand Down Expand Up @@ -1421,6 +1479,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
}
}

void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) {
Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key));

const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key);
Y_ABORT_UNLESS(snapshot);

auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
return;
}

if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();
PendingSchemaSnapshotsToGc.push_back(key);
if (wasEmpty) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}
}

void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() {
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();

for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) {
auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
break;
}
if (SchemaSnapshotManager.HasReference(key)) {
continue;
}
if (snapshot.Schema->GetTableSchemaVersion() >= it->second->GetTableSchemaVersion()) {
continue;
}

PendingSchemaSnapshotsToGc.push_back(key);
}

if (wasEmpty && !PendingSchemaSnapshotsToGc.empty()) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}

void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
Expand Down Expand Up @@ -1649,8 +1752,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId));
auto tableInfo = TableInfos[pathId.LocalPathId];

const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion);
const auto key = TSchemaSnapshotKey(pathId, tableSchemaVersion);
SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId));

const auto& snapshots = SchemaSnapshotManager.GetSnapshots();
for (auto it = snapshots.lower_bound(TSchemaSnapshotKey(pathId, 1)); it != snapshots.end(); ++it) {
if (it->first == key) {
break;
}
if (!SchemaSnapshotManager.HasReference(it->first)) {
ScheduleRemoveSchemaSnapshot(it->first);
}
}
}

void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
return false;
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) {
if (!Self->LoadChangeRecords(db, ChangeRecords)) {
return false;
Expand Down Expand Up @@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
TDataShardLocksDb locksDb(*Self, txc);
if (!Self->SysLocks.Load(locksDb)) {
Expand Down Expand Up @@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
Self->SubscribeNewLocks();

Self->ScheduleRemoveAbandonedLockChanges();
Self->ScheduleRemoveAbandonedSchemaSnapshots();

return true;
}
Expand Down
12 changes: 4 additions & 8 deletions ydb/core/tx/datashard/datashard_change_sending.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
ChangeExchangeSplit = true;
} else {
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
ActivationList.insert(dstTabletId);
}
}
Expand Down Expand Up @@ -346,9 +346,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
}

for (const auto dstTabletId : ActivationList) {
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}

Self->CheckStateChange(ctx);
Expand Down Expand Up @@ -383,7 +381,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
Y_ABORT_UNLESS(Self->ChangeExchangeSplitter.Done());

for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
ActivationList.insert(dstTabletId);
}
}
Expand All @@ -396,9 +394,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
<< ", at tablet# " << Self->TabletID());

for (const auto dstTabletId : ActivationList) {
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
}

Expand Down
Loading

0 comments on commit ebbd0c1

Please sign in to comment.