Skip to content

Commit

Permalink
Vector index coordination [2/N] (#10462)
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt authored Oct 16, 2024
1 parent 97f68e9 commit f1972db
Showing 1 changed file with 47 additions and 49 deletions.
96 changes: 47 additions & 49 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,20 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
Self->IndexBuildPipes.CloseAll(BuildId, ctx);
}

template<typename Send>
bool SendToShards(TIndexBuildInfo& buildInfo, Send&& send) {
while (!buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.size() < buildInfo.Limits.MaxShards) {
auto shardIdx = buildInfo.ToUploadShards.front();
buildInfo.ToUploadShards.pop_front();
buildInfo.InProgressShards.emplace(shardIdx);
send(shardIdx);
}

return buildInfo.InProgressShards.empty() && buildInfo.ToUploadShards.empty();
}

bool FillTable(TIndexBuildInfo& buildInfo) {
if (buildInfo.ToUploadShards.empty() && buildInfo.DoneShardsSize == 0 && buildInfo.InProgressShards.empty()) {
if (buildInfo.DoneShardsSize == 0 && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
for (const auto& [idx, status] : buildInfo.Shards) {
switch (status.Status) {
case NKikimrIndexBuilder::EBuildStatus::INVALID:
Expand All @@ -634,21 +646,12 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
}
}
}

while (!buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.size() < buildInfo.Limits.MaxShards) {
auto shardIdx = buildInfo.ToUploadShards.front();
buildInfo.ToUploadShards.pop_front();
buildInfo.InProgressShards.emplace(shardIdx);

SendBuildIndexRequest(shardIdx, buildInfo);
}

return buildInfo.InProgressShards.empty() && buildInfo.ToUploadShards.empty() &&
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildIndexRequest(shardIdx, buildInfo); }) &&
buildInfo.DoneShardsSize == buildInfo.Shards.size();
}

void ComputeKMeansState(TIndexBuildInfo& buildInfo) {
if (!buildInfo.ToUploadShards.empty() || !buildInfo.InProgressShards.empty()) {
if (buildInfo.DoneShardsSize != 0 || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()) {
return;
}
std::array<NScheme::TTypeInfo, 1> typeInfos{NScheme::NTypeIds::Uint32};
Expand Down Expand Up @@ -689,17 +692,10 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
buildInfo.InProgressShards.clear();
return true;
}
while (!buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.size() < buildInfo.Limits.MaxShards) {
auto shardIdx = buildInfo.ToUploadShards.front();
buildInfo.ToUploadShards.pop_front();
buildInfo.InProgressShards.emplace(shardIdx);
SendSampleKRequest(shardIdx, buildInfo);
}
return buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty();
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendSampleKRequest(shardIdx, buildInfo); });
}

bool FillVectorIndex(TIndexBuildInfo& buildInfo) {
ComputeKMeansState(buildInfo);
bool SendVectorIndex(TIndexBuildInfo& buildInfo) {
switch (buildInfo.KMeans.State) {
case TIndexBuildInfo::TKMeans::Sample:
return SendKMeansSample(buildInfo);
Expand All @@ -716,26 +712,13 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
return true;
}

bool FillIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
if (!buildInfo.SnapshotTxId || !buildInfo.SnapshotStep) {
Y_ABORT_UNLESS(Self->TablesWithSnapshots.contains(buildInfo.TablePathId));
Y_ABORT_UNLESS(Self->TablesWithSnapshots.at(buildInfo.TablePathId) == buildInfo.InitiateTxId);

buildInfo.SnapshotTxId = buildInfo.InitiateTxId;
Y_ABORT_UNLESS(buildInfo.SnapshotTxId);
buildInfo.SnapshotStep = Self->SnapshotsStepIds.at(buildInfo.SnapshotTxId);
Y_ABORT_UNLESS(buildInfo.SnapshotStep);
}
if (buildInfo.Shards.empty()) {
NIceDb::TNiceDb db(txc.DB);
InitiateShards(db, buildInfo);
}
if (!buildInfo.IsBuildVectorIndex()) {
return FillTable(buildInfo);
}
if (!FillVectorIndex(buildInfo)) {
bool FillVectorIndex(TIndexBuildInfo& buildInfo) {
ComputeKMeansState(buildInfo);
if (!SendVectorIndex(buildInfo)) {
return false;
}
buildInfo.DoneShardsSize = 0;

if (!buildInfo.Sample.Sent && !buildInfo.Sample.Rows.empty()) {
SendUploadSampleKRequest(buildInfo);
return false;
Expand All @@ -745,6 +728,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
return false;
}
buildInfo.Sample.Clear();

if (buildInfo.KMeans.NextParent()) {
Progress(BuildId);
return false;
Expand All @@ -757,6 +741,28 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
return true;
}

bool FillIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
if (!buildInfo.SnapshotTxId || !buildInfo.SnapshotStep) {
Y_ABORT_UNLESS(Self->TablesWithSnapshots.contains(buildInfo.TablePathId));
Y_ABORT_UNLESS(Self->TablesWithSnapshots.at(buildInfo.TablePathId) == buildInfo.InitiateTxId);

buildInfo.SnapshotTxId = buildInfo.InitiateTxId;
Y_ABORT_UNLESS(buildInfo.SnapshotTxId);
buildInfo.SnapshotStep = Self->SnapshotsStepIds.at(buildInfo.SnapshotTxId);
Y_ABORT_UNLESS(buildInfo.SnapshotStep);
}
if (buildInfo.Shards.empty()) {
NIceDb::TNiceDb db(txc.DB);
InitiateShards(db, buildInfo);
}
if (buildInfo.IsBuildVectorIndex()) {
return FillVectorIndex(buildInfo);
} else {
Y_ASSERT(buildInfo.IsBuildSecondaryIndex() || buildInfo.IsBuildColumns());
return FillTable(buildInfo);
}
}

public:
explicit TTxProgress(TSelf* self, TIndexBuildId id)
: TTxBase(self, TXTYPE_PROGRESS_INDEX_BUILD)
Expand Down Expand Up @@ -1500,18 +1506,10 @@ struct TSchemeShard::TIndexBuilder::TTxReplyProgress: public TSchemeShard::TInde
break;

case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
buildInfo.Issue += TStringBuilder()
<< "One of the shards report BUILD_ERROR at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
<< ", shardIdx: " << shardIdx;
Self->PersistBuildIndexIssue(db, buildInfo);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);

Progress(buildId);
break;
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
buildInfo.Issue += TStringBuilder()
<< "One of the shards report BAD_REQUEST at Filling stage, process has to be canceled"
<< "One of the shards report " << shardStatus.Status
<< " at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
<< ", shardIdx: " << shardIdx;
Self->PersistBuildIndexIssue(db, buildInfo);
Expand Down

0 comments on commit f1972db

Please sign in to comment.