Skip to content

Commit

Permalink
Index build: do not lose the requested partitioning info of indexImpl…
Browse files Browse the repository at this point in the history
…Tables in case of SchemeShard reboots (#10579)
  • Loading branch information
jepett0 authored Oct 21, 2024
1 parent 7e95d38 commit 5c4d7fa
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 16 deletions.
26 changes: 23 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ void TSchemeShard::Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const

void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuildInfo& info) {
Y_ABORT_UNLESS(info.BuildKind != TIndexBuildInfo::EBuildKind::BuildKindUnspecified);
db.Table<Schema::IndexBuild>().Key(info.Id).Update(
auto persistedBuildIndex = db.Table<Schema::IndexBuild>().Key(info.Id);
persistedBuildIndex.Update(
NIceDb::TUpdate<Schema::IndexBuild::Uid>(info.Uid),
NIceDb::TUpdate<Schema::IndexBuild::DomainOwnerId>(info.DomainPathId.OwnerId),
NIceDb::TUpdate<Schema::IndexBuild::DomainLocalId>(info.DomainPathId.LocalPathId),
Expand All @@ -59,9 +60,28 @@ void TSchemeShard::PersistCreateBuildIndex(NIceDb::TNiceDb& db, const TIndexBuil
NIceDb::TUpdate<Schema::IndexBuild::MaxShards>(info.Limits.MaxShards),
NIceDb::TUpdate<Schema::IndexBuild::MaxRetries>(info.Limits.MaxRetries),
NIceDb::TUpdate<Schema::IndexBuild::BuildKind>(ui32(info.BuildKind))

// TODO save info.ImplTableDescriptions
);
// Persist details of the index build operation: ImplTableDescriptions and SpecializedIndexDescription.
// We have chosen TIndexCreationConfig's string representation as the serialization format.
if (bool hasSpecializedDescription = !std::holds_alternative<std::monostate>(info.SpecializedIndexDescription);
info.ImplTableDescriptions || hasSpecializedDescription
) {
NKikimrSchemeOp::TIndexCreationConfig serializableRepresentation;

for (const auto& description : info.ImplTableDescriptions) {
*serializableRepresentation.AddIndexImplTableDescriptions() = description;
}

std::visit([&]<typename T>(const T& specializedDescription) {
if constexpr (std::is_same_v<T, NKikimrSchemeOp::TVectorIndexKmeansTreeDescription>) {
*serializableRepresentation.MutableVectorIndexKmeansTreeDescription() = specializedDescription;
}
}, info.SpecializedIndexDescription);

persistedBuildIndex.Update(
NIceDb::TUpdate<Schema::IndexBuild::CreationConfig>(serializableRepresentation.SerializeAsString())
);
}

ui32 columnNo = 0;
for (ui32 i = 0; i < info.IndexColumns.size(); ++i, ++columnNo) {
Expand Down
25 changes: 22 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2991,7 +2991,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
// TODO(mbkkt) move to TVectorIndexKmeansTreeDescription
ui32 K = 4;
ui32 Levels = 5;

// progress
enum EState : ui32 {
Sample = 0,
Expand All @@ -3007,7 +3007,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
EState State = Sample;

ui32 ChildBegin = 1; // included

static ui32 BinPow(ui32 k, ui32 l) {
ui32 r = 1;
while (l != 0) {
Expand Down Expand Up @@ -3282,7 +3282,26 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
indexInfo->IndexName = row.template GetValue<Schema::IndexBuild::IndexName>();
indexInfo->IndexType = row.template GetValue<Schema::IndexBuild::IndexType>();

// TODO load indexInfo->ImplTableDescriptions
// Restore the operation details: ImplTableDescriptions and SpecializedIndexDescription.
if (row.template HaveValue<Schema::IndexBuild::CreationConfig>()) {
NKikimrSchemeOp::TIndexCreationConfig creationConfig;
Y_ABORT_UNLESS(creationConfig.ParseFromString(row.template GetValue<Schema::IndexBuild::CreationConfig>()));

auto& descriptions = *creationConfig.MutableIndexImplTableDescriptions();
indexInfo->ImplTableDescriptions.reserve(descriptions.size());
for (auto& description : descriptions) {
indexInfo->ImplTableDescriptions.emplace_back(std::move(description));
}

switch (creationConfig.GetSpecializedIndexDescriptionCase()) {
case NKikimrSchemeOp::TIndexCreationConfig::kVectorIndexKmeansTreeDescription:
indexInfo->SpecializedIndexDescription = std::move(*creationConfig.MutableVectorIndexKmeansTreeDescription());
break;
case NKikimrSchemeOp::TIndexCreationConfig::SPECIALIZEDINDEXDESCRIPTION_NOT_SET:
/* do nothing */
break;
}
}

indexInfo->State = TIndexBuildInfo::EState(
row.template GetValue<Schema::IndexBuild::State>());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,9 @@ struct Schema : NIceDb::Schema {
struct AlterMainTableTxStatus : Column<32, NScheme::NTypeIds::Uint32> { using Type = NKikimrScheme::EStatus; };
struct AlterMainTableTxDone : Column<33, NScheme::NTypeIds::Bool> {};

// Serialized as string NKikimrSchemeOp::TIndexCreationConfig protobuf.
struct CreationConfig : Column<34, NScheme::NTypeIds::String> { using Type = TString; };

using TKey = TableKey<Id>;
using TColumns = TableColumns<
Id,
Expand Down Expand Up @@ -1359,7 +1362,8 @@ struct Schema : NIceDb::Schema {
BuildKind,
AlterMainTableTxId,
AlterMainTableTxStatus,
AlterMainTableTxDone
AlterMainTableTxDone,
CreationConfig
>;
};

Expand Down
22 changes: 17 additions & 5 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1699,11 +1699,23 @@ namespace NSchemeShardUT_Private {
} break;
case NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree: {
auto& settings = *index.mutable_global_vector_kmeans_tree_index();
settings = Ydb::Table::GlobalVectorKMeansTreeIndex();
// some random valid settings
settings.mutable_vector_settings()->mutable_settings()->set_vector_type(Ydb::Table::VectorIndexSettings::VECTOR_TYPE_FLOAT);
settings.mutable_vector_settings()->mutable_settings()->set_vector_dimension(42);
settings.mutable_vector_settings()->mutable_settings()->set_metric(Ydb::Table::VectorIndexSettings::DISTANCE_COSINE);

auto& vectorIndexSettings = *settings.mutable_vector_settings()->mutable_settings();
if (cfg.VectorIndexSettings) {
cfg.VectorIndexSettings->SerializeTo(vectorIndexSettings);
} else {
// some random valid settings
vectorIndexSettings.set_vector_type(Ydb::Table::VectorIndexSettings::VECTOR_TYPE_FLOAT);
vectorIndexSettings.set_vector_dimension(42);
vectorIndexSettings.set_metric(Ydb::Table::VectorIndexSettings::DISTANCE_COSINE);
}

if (cfg.GlobalIndexSettings) {
cfg.GlobalIndexSettings[0].SerializeTo(*settings.mutable_level_table_settings());
if (cfg.GlobalIndexSettings.size() > 1) {
cfg.GlobalIndexSettings[1].SerializeTo(*settings.mutable_posting_table_settings());
}
}
} break;
default:
UNIT_ASSERT_C(false, "Unknown index type: " << static_cast<ui32>(cfg.IndexType));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

namespace NYdb::NTable {
struct TGlobalIndexSettings;
struct TVectorIndexSettings;
}

namespace NSchemeShardUT_Private {
Expand Down Expand Up @@ -371,6 +372,8 @@ namespace NSchemeShardUT_Private {
TVector<TString> IndexColumns;
TVector<TString> DataColumns;
TVector<NYdb::NTable::TGlobalIndexSettings> GlobalIndexSettings = {};
// implementation note: it was made a pointer, not optional, to enable forward declaration
std::unique_ptr<NYdb::NTable::TVectorIndexSettings> VectorIndexSettings = {};
};

std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal);
Expand Down
56 changes: 54 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ TCheckFunc IndexDataColumns(const TVector<TString>& dataColumnNames) {
};
}

TCheckFunc VectorIndexDescription(Ydb::Table::VectorIndexSettings_Metric metric,
TCheckFunc VectorIndexDescription(Ydb::Table::VectorIndexSettings_Metric metric,
Ydb::Table::VectorIndexSettings_VectorType vectorType,
ui32 vectorDimension
) {
Expand Down Expand Up @@ -1309,11 +1309,63 @@ TCheckFunc PartitionKeys(TVector<TString> lastShardKeys) {
const auto& pathDescr = record.GetPathDescription();
UNIT_ASSERT_VALUES_EQUAL(lastShardKeys.size(), pathDescr.TablePartitionsSize());
for (size_t i = 0; i < lastShardKeys.size(); ++i) {
UNIT_ASSERT_STRING_CONTAINS(pathDescr.GetTablePartitions(i).GetEndOfRangeKeyPrefix(), lastShardKeys[i]);
const auto& partition = pathDescr.GetTablePartitions(i);
UNIT_ASSERT_STRING_CONTAINS_C(
partition.GetEndOfRangeKeyPrefix(), lastShardKeys[i],
"partition index: " << i << '\n'
<< "actual key prefix: " << partition.GetEndOfRangeKeyPrefix().Quote() << '\n'
<< "expected key prefix: " << lastShardKeys[i].Quote() << '\n'
);
}
};
}

namespace {

// Serializes / deserializes a value of type T to a cell vector string representation.
template <typename T>
struct TSplitBoundarySerializer {
static TString Serialize(T splitBoundary) {
const auto cell = TCell::Make(splitBoundary);
TSerializedCellVec cellVec(TArrayRef<const TCell>(&cell, 1));
return cellVec.ReleaseBuffer();
}

static TVector<T> Deserialize(const TString& serializedCells) {
TSerializedCellVec cells(serializedCells);
TVector<T> values;
for (const auto& cell : cells.GetCells()) {
if (cell.IsNull()) {
// the last cell
break;
}
values.emplace_back(cell.AsValue<T>());
}
return values;
}
};

}

template <typename T>
TCheckFunc SplitBoundaries(TVector<T>&& expectedBoundaries) {
return [expectedBoundaries = std::move(expectedBoundaries)] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
const auto& pathDescr = record.GetPathDescription();
UNIT_ASSERT_VALUES_EQUAL(pathDescr.TablePartitionsSize(), expectedBoundaries.size() + 1);
for (size_t i = 0; i < expectedBoundaries.size(); ++i) {
const auto& partition = pathDescr.GetTablePartitions(i);
const auto actualBoundary = TSplitBoundarySerializer<T>::Deserialize(partition.GetEndOfRangeKeyPrefix()).at(0);
UNIT_ASSERT_VALUES_EQUAL_C(
actualBoundary, expectedBoundaries[i],
"partition index: " << i << '\n'
<< "actual key prefix: " << partition.GetEndOfRangeKeyPrefix().Quote() << '\n'
);
}
};
}

template TCheckFunc SplitBoundaries<ui32>(TVector<ui32>&&);

TCheckFunc ServerlessComputeResourcesMode(NKikimrSubDomains::EServerlessComputeResourcesMode serverlessComputeResourcesMode) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_C(IsGoodDomainStatus(record.GetStatus()), "Unexpected status: " << record.GetStatus());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ namespace NLs {
void CheckBoundaries(const NKikimrScheme::TEvDescribeSchemeResult& record);
TCheckFunc PartitionCount(ui32 count);
TCheckFunc PartitionKeys(TVector<TString> lastShardKeys);
// Checks if the serialized representation of an expected boundary is a prefix of the actual one.
// Similar to PartitionKeys check, but does not require you to pass split boundaries in a serialized form.
template <typename T>
TCheckFunc SplitBoundaries(TVector<T>&& expectedBoundaries);
TCheckFunc FollowerCount(ui32 count);
TCheckFunc CrossDataCenterFollowerCount(ui32 count);
TCheckFunc AllowFollowerPromotion(bool val);
Expand Down Expand Up @@ -141,7 +145,7 @@ namespace NLs {
TCheckFunc IndexState(NKikimrSchemeOp::EIndexState state);
TCheckFunc IndexKeys(const TVector<TString>& keyNames);
TCheckFunc IndexDataColumns(const TVector<TString>& dataColumnNames);

TCheckFunc VectorIndexDescription(Ydb::Table::VectorIndexSettings_Metric metric,
Ydb::Table::VectorIndexSettings_VectorType vectorType,
ui32 vectorDimension
Expand Down
72 changes: 72 additions & 0 deletions ydb/core/tx/schemeshard/ut_index_build/ut_index_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,78 @@ Y_UNIT_TEST_SUITE(IndexBuildTest) {
}
}

Y_UNIT_TEST(IndexPartitioningIsPersisted) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: [ "key" ]
)");
env.TestWaitNotification(runtime, txId);

Ydb::Table::GlobalIndexSettings settings;
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
partition_at_keys {
split_points {
type { tuple_type { elements { optional_type { item { type_id: UTF8 } } } } }
value { items { text_value: "alice" } }
}
split_points {
type { tuple_type { elements { optional_type { item { type_id: UTF8 } } } } }
value { items { text_value: "bob" } }
}
}
partitioning_settings {
min_partitions_count: 3
max_partitions_count: 3
}
)", &settings));

TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> indexCreationBlocker(runtime, [](const auto& ev) {
const auto& modifyScheme = ev->Get()->Record.GetTransaction(0);
return modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpCreateIndexBuild;
});

const ui64 buildIndexTx = ++txId;
TestBuildIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/Table", TBuildIndexConfig{
"Index", NKikimrSchemeOp::EIndexTypeGlobal, { "value" }, {},
{ NYdb::NTable::TGlobalIndexSettings::FromProto(settings) }
});

RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());

indexCreationBlocker.Stop().Unblock();
env.TestWaitNotification(runtime, buildIndexTx);

auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
UNIT_ASSERT_VALUES_EQUAL_C(
buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE,
buildIndexOperation.DebugString()
);

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), {
NLs::IsTable,
NLs::IndexesCount(1)
});

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index"), {
NLs::PathExist,
NLs::IndexState(NKikimrSchemeOp::EIndexState::EIndexStateReady)
});

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable", true, true), {
NLs::IsTable,
NLs::PartitionCount(3),
NLs::MinPartitionsCountEqual(3),
NLs::MaxPartitionsCountEqual(3),
NLs::PartitionKeys({"alice", "bob", ""})
});
}

Y_UNIT_TEST(DropIndex) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down
Loading

0 comments on commit 5c4d7fa

Please sign in to comment.