Skip to content

Commit

Permalink
fixes and reduce schema memory
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Sep 7, 2024
1 parent 09076f0 commit 0a1d586
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 63 deletions.
103 changes: 64 additions & 39 deletions ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema&
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "incorrect_engine_in_schema");
return false;
}
AFL_VERIFY(cache);

{
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::Optimizer");
Expand Down Expand Up @@ -235,39 +236,46 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema&
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::Columns");
for (const auto& col : schema.GetColumns()) {
const ui32 id = col.GetId();
const TString& name = col.GetName();
const TString& name = cache->GetStringCache(col.GetName());
const bool notNull = col.HasNotNull() ? col.GetNotNull() : false;
auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr);
Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod, notNull);
Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, cache->GetStringCache(typeInfoMod.TypeMod), notNull);
ColumnNames[name] = id;
}
}
for (const auto& keyName : schema.GetKeyColumnNames()) {
Y_ABORT_UNLESS(ColumnNames.contains(keyName));
KeyColumns.push_back(ColumnNames[keyName]);
}
InitializeCaches(operators, cache);
InitializeCaches(operators, cache, false);
{
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::Columns::Features");
for (const auto& col : schema.GetColumns()) {
auto it = ColumnFeatures.find(col.GetId());
AFL_VERIFY(it != ColumnFeatures.end());
const TString fingerprint = cache ? col.SerializeAsString() : Default<TString>();
if (cache) {
if (std::shared_ptr<TColumnFeatures> f = cache->GetColumnFeatures(fingerprint)) {
it->second = f;
continue;
THashMap<ui32, std::shared_ptr<TColumnFeatures>> it;
const TString fingerprint = cache ? ("C:" + col.SerializeAsString()) : Default<TString>();
const auto createPred = [&]() -> TConclusion<std::shared_ptr<TColumnFeatures>> {
auto f = BuildDefaultColumnFeatures(col.GetId(), operators);
auto parsed = f->DeserializeFromProto(col, operators);
if (parsed.IsFail()) {
return parsed;
}
}

auto parsed = it->second->DeserializeFromProto(col, operators);
if (!parsed) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature")("reason", parsed.GetErrorMessage());
return f;
};
auto fConclusion = cache->GetOrCreateColumnFeatures(fingerprint, createPred);
if (fConclusion.IsFail()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_column_feature")("reason", fConclusion.GetErrorMessage());
return false;
}
if (cache) {
cache->RegisterColumnFeatures(fingerprint, it->second);
}
AFL_VERIFY(ColumnFeatures.emplace(col.GetId(), fConclusion.DetachResult()).second);
}
for (auto&& cId : GetSystemColumnIds()) {
THashMap<ui32, std::shared_ptr<TColumnFeatures>> it;
const TString fingerprint = "SC:" + ::ToString(cId);
const auto createPred = [&]() -> TConclusion<std::shared_ptr<TColumnFeatures>> {
return BuildDefaultColumnFeatures(cId, operators);
};
auto fConclusion = cache->GetOrCreateColumnFeatures(fingerprint, createPred);
AFL_VERIFY(ColumnFeatures.emplace(cId, fConclusion.DetachResult()).second);
}
}

Expand Down Expand Up @@ -295,8 +303,8 @@ std::optional<TIndexInfo> TIndexInfo::BuildFromProto(const NKikimrSchemeOp::TCol
return result;
}

std::shared_ptr<arrow::Schema> MakeArrowSchema(
const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, const std::shared_ptr<TSchemaObjectsCache>& cache) {
std::vector<std::shared_ptr<arrow::Field>> MakeArrowFields(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids,
const std::shared_ptr<TSchemaObjectsCache>& cache) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (const ui32 id : ids) {
AFL_VERIFY(!TIndexInfo::IsSpecialColumn(id));
Expand All @@ -321,10 +329,16 @@ std::shared_ptr<arrow::Schema> MakeArrowSchema(
}
}

return std::make_shared<arrow::Schema>(std::move(fields));
return fields;
}

std::shared_ptr<arrow::Schema> MakeArrowSchema(
const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids, const std::shared_ptr<TSchemaObjectsCache>& cache) {
return std::make_shared<arrow::Schema>(MakeArrowFields(columns, ids, cache));
}

void TIndexInfo::InitializeCaches(const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<TSchemaObjectsCache>& cache) {
void TIndexInfo::InitializeCaches(const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<TSchemaObjectsCache>& cache,
const bool withColumnFeatures) {
{
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::Schema");
AFL_VERIFY(!Schema);
Expand All @@ -334,33 +348,31 @@ void TIndexInfo::InitializeCaches(const std::shared_ptr<IStoragesManager>& opera
}

std::sort(SchemaColumnIds.begin(), SchemaColumnIds.end());
auto originalSchema = MakeArrowSchema(Columns, SchemaColumnIds, cache);
Schema = std::make_shared<NArrow::TSchemaLite>(originalSchema);
SchemaWithSpecials = std::make_shared<NArrow::TSchemaLite>(IIndexInfo::AddSpecialFields(originalSchema));
auto originalFields = MakeArrowFields(Columns, SchemaColumnIds, cache);
Schema = std::make_shared<NArrow::TSchemaLite>(originalFields);
IIndexInfo::AddSpecialFields(originalFields);
SchemaWithSpecials = std::make_shared<NArrow::TSchemaLite>(originalFields);
}
{
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::SchemaFields");
SchemaColumnIdsWithSpecials = IIndexInfo::AddSpecialFieldIds(SchemaColumnIds);
SchemaColumnIdsWithSpecialsSet = IIndexInfo::AddSpecialFieldIds(std::set<ui32>(SchemaColumnIds.begin(), SchemaColumnIds.end()));
ui32 idx = 0;
for (auto&& i : SchemaColumnIdsWithSpecials) {
AFL_VERIFY(IdIntoIndex.emplace(i, idx++).second);
}
}
{
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::Columns");
for (auto&& c : Columns) {
AFL_VERIFY(ArrowColumnByColumnIdCache.emplace(c.first, GetColumnFieldVerified(c.first)).second);
AFL_VERIFY(ColumnFeatures.emplace(c.first, std::make_shared<TColumnFeatures>(c.first, GetColumnFieldVerified(c.first), DefaultSerializer, operators->GetDefaultOperator(),
NArrow::IsPrimitiveYqlType(c.second.PType), c.first == GetPKFirstColumnId(), nullptr)).second);
if (withColumnFeatures) {
{
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::Columns");
for (auto&& c : Columns) {
AFL_VERIFY(ColumnFeatures.emplace(c.first, BuildDefaultColumnFeatures(c.first, operators)).second);
}
}
}
{
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::SysColumns");
for (auto&& cId : GetSystemColumnIds()) {
AFL_VERIFY(ArrowColumnByColumnIdCache.emplace(cId, GetColumnFieldVerified(cId)).second);
AFL_VERIFY(ColumnFeatures.emplace(cId, std::make_shared<TColumnFeatures>(cId, GetColumnFieldVerified(cId), DefaultSerializer, operators->GetDefaultOperator(),
false, false, IIndexInfo::DefaultColumnValue(cId))).second);
{
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::InitializeCaches::SysColumns");
for (auto&& cId : GetSystemColumnIds()) {
AFL_VERIFY(ColumnFeatures.emplace(cId, BuildDefaultColumnFeatures(cId, operators)).second);
}
}
}
}
Expand Down Expand Up @@ -448,4 +460,17 @@ std::vector<ui32> TIndexInfo::GetEntityIds() const {
return result;
}

std::shared_ptr<NKikimr::NOlap::TColumnFeatures> TIndexInfo::BuildDefaultColumnFeatures(
const ui32 columnId, const std::shared_ptr<IStoragesManager>& operators) const {
if (IsSpecialColumn(columnId)) {
return std::make_shared<TColumnFeatures>(columnId, GetColumnFieldVerified(columnId), DefaultSerializer, operators->GetDefaultOperator(),
false, false, IIndexInfo::DefaultColumnValue(columnId));
} else {
auto itC = Columns.find(columnId);
AFL_VERIFY(itC != Columns.end());
return std::make_shared<TColumnFeatures>(columnId, GetColumnFieldVerified(columnId), DefaultSerializer, operators->GetDefaultOperator(),
NArrow::IsPrimitiveYqlType(itC->second.PType), columnId == GetPKFirstColumnId(), nullptr);
}
}

} // namespace NKikimr::NOlap
38 changes: 26 additions & 12 deletions ydb/core/tx/columnshard/engines/scheme/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,19 @@ class TSchemaObjectsCache {
private:
THashMap<TString, std::shared_ptr<arrow::Field>> Fields;
THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
THashSet<TString> StringsCache;
mutable ui64 AcceptionFieldsCount = 0;
mutable ui64 AcceptionFeaturesCount = 0;

public:
const TString& GetStringCache(const TString& original) {
auto it = StringsCache.find(original);
if (it == StringsCache.end()) {
it = StringsCache.emplace(original).first;
}
return *it;
}

void RegisterField(const TString& fingerprint, const std::shared_ptr<arrow::Field>& f) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString());
AFL_VERIFY(Fields.emplace(fingerprint, f).second);
Expand All @@ -71,16 +80,23 @@ class TSchemaObjectsCache {
}
return it->second;
}
std::shared_ptr<TColumnFeatures> GetColumnFeatures(const TString& fingerprint) const {
template <class TConstructor>
TConclusion<std::shared_ptr<TColumnFeatures>> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) {
auto it = ColumnFeatures.find(fingerprint);
if (it == ColumnFeatures.end()) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))(
"count", ColumnFeatures.size())("acc", AcceptionFeaturesCount);
return nullptr;
}
if (++AcceptionFeaturesCount % 1000 == 0) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_accept")("fp", UrlEscapeRet(fingerprint))(
"count", ColumnFeatures.size())("acc", AcceptionFeaturesCount);
TConclusion<std::shared_ptr<TColumnFeatures>> resultConclusion = constructor();
if (resultConclusion.IsFail()) {
return resultConclusion;
}
it = ColumnFeatures.emplace(fingerprint, resultConclusion.DetachResult()).first;
AFL_VERIFY(it->second);
} else {
if (++AcceptionFeaturesCount % 1000 == 0) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_accept")("fp", UrlEscapeRet(fingerprint))(
"count", ColumnFeatures.size())("acc", AcceptionFeaturesCount);
}
}
return it->second;
}
Expand All @@ -92,15 +108,15 @@ struct TIndexInfo: public NTable::TScheme::TTableSchema, public IIndexInfo {
private:
THashMap<ui32, ui32> IdIntoIndex;
THashMap<ui32, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
THashMap<ui32, std::shared_ptr<arrow::Field>> ArrowColumnByColumnIdCache;
THashMap<ui32, NIndexes::TIndexMetaContainer> Indexes;
TIndexInfo(const TString& name);
bool SchemeNeedActualization = false;
std::shared_ptr<NStorageOptimizer::IOptimizerPlannerConstructor> CompactionPlannerConstructor;
bool ExternalGuaranteeExclusivePK = false;
bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema, const std::shared_ptr<IStoragesManager>& operators,
const std::shared_ptr<TSchemaObjectsCache>& cache);
void InitializeCaches(const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<TSchemaObjectsCache>& cache);
void InitializeCaches(const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<TSchemaObjectsCache>& cache, const bool withColumnFeatures = true);
std::shared_ptr<TColumnFeatures> BuildDefaultColumnFeatures(const ui32 columnId, const std::shared_ptr<IStoragesManager>& operators) const;

public:
std::shared_ptr<NStorageOptimizer::IOptimizerPlannerConstructor> GetCompactionPlannerConstructor() const;
Expand Down Expand Up @@ -312,9 +328,6 @@ struct TIndexInfo: public NTable::TScheme::TTableSchema, public IIndexInfo {
std::vector<TString> GetColumnNames(const std::vector<ui32>& ids) const;
std::vector<std::string> GetColumnSTLNames(const std::vector<ui32>& ids) const;
const std::vector<ui32>& GetColumnIds(const bool withSpecial = true) const;
const std::set<ui32>& GetColumnIdsSet() const {
return SchemaColumnIdsWithSpecialsSet;
}
const std::vector<ui32>& GetPKColumnIds() const {
AFL_VERIFY(PKColumnIds.size());
return PKColumnIds;
Expand Down Expand Up @@ -379,7 +392,6 @@ struct TIndexInfo: public NTable::TScheme::TTableSchema, public IIndexInfo {
TString Name;
std::vector<ui32> SchemaColumnIds;
std::vector<ui32> SchemaColumnIdsWithSpecials;
std::set<ui32> SchemaColumnIdsWithSpecialsSet;
std::vector<ui32> PKColumnIds;
std::shared_ptr<NArrow::TSchemaLite> SchemaWithSpecials;
std::shared_ptr<NArrow::TSchemaLite> Schema;
Expand All @@ -390,6 +402,8 @@ struct TIndexInfo: public NTable::TScheme::TTableSchema, public IIndexInfo {

std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids,
const std::shared_ptr<TSchemaObjectsCache>& cache = nullptr);
std::vector<std::shared_ptr<arrow::Field>> MakeArrowFields(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector<ui32>& ids,
const std::shared_ptr<TSchemaObjectsCache>& cache = nullptr);

/// Extracts columns with the specific ids from the schema.
std::vector<TNameTypeInfo> GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const std::vector<ui32>& ids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ISnapshotSchema {
std::shared_ptr<NArrow::NAccessor::TColumnLoader> GetColumnLoaderVerified(const std::string& columnName) const;

bool IsSpecialColumnId(const ui32 columnId) const;
virtual const std::set<ui32>& GetColumnIds() const = 0;
virtual const std::vector<ui32>& GetColumnIds() const = 0;

virtual NArrow::NAccessor::TColumnSaver GetColumnSaver(const ui32 columnId) const = 0;
NArrow::NAccessor::TColumnSaver GetColumnSaver(const TString& columnName) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace NKikimr::NOlap {

TFilteredSnapshotSchema::TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& originalSnapshot, const std::vector<ui32>& columnIds)
: TFilteredSnapshotSchema(originalSnapshot, std::set(columnIds.begin(), columnIds.end())) {
TFilteredSnapshotSchema::TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& originalSnapshot, const std::set<ui32>& columnIds)
: TFilteredSnapshotSchema(originalSnapshot, std::vector(columnIds.begin(), columnIds.end())) {
}

TFilteredSnapshotSchema::TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& originalSnapshot, const std::set<ui32>& columnIds)
TFilteredSnapshotSchema::TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& originalSnapshot, const std::vector<ui32>& columnIds)
: OriginalSnapshot(originalSnapshot)
, ColumnIds(columnIds)
{
Expand All @@ -21,12 +21,12 @@ TFilteredSnapshotSchema::TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& or
}

TColumnSaver TFilteredSnapshotSchema::GetColumnSaver(const ui32 columnId) const {
Y_ABORT_UNLESS(ColumnIds.contains(columnId));
AFL_VERIFY(std::find(ColumnIds.begin(), ColumnIds.end(), columnId) != ColumnIds.end());
return OriginalSnapshot->GetColumnSaver(columnId);
}

std::shared_ptr<TColumnLoader> TFilteredSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const {
Y_ABORT_UNLESS(ColumnIds.contains(columnId));
AFL_VERIFY(std::find(ColumnIds.begin(), ColumnIds.end(), columnId) != ColumnIds.end());
return OriginalSnapshot->GetColumnLoaderOptional(columnId);
}

Expand All @@ -35,15 +35,15 @@ std::optional<ui32> TFilteredSnapshotSchema::GetColumnIdOptional(const std::stri
if (!result) {
return result;
}
if (!ColumnIds.contains(*result)) {
if (std::find(ColumnIds.begin(), ColumnIds.end(), *result) == ColumnIds.end()) {
return std::nullopt;
}
return result;
}

ui32 TFilteredSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
auto result = OriginalSnapshot->GetColumnIdVerified(columnName);
AFL_VERIFY(ColumnIds.contains(result));
AFL_VERIFY(std::find(ColumnIds.begin(), ColumnIds.end(), result) != ColumnIds.end());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NKikimr::NOlap {
class TFilteredSnapshotSchema: public ISnapshotSchema {
ISnapshotSchema::TPtr OriginalSnapshot;
std::shared_ptr<NArrow::TSchemaLite> Schema;
std::set<ui32> ColumnIds;
std::vector<ui32> ColumnIds;
THashMap<ui32, ui32> IdIntoIndex;

protected:
Expand All @@ -18,7 +18,7 @@ class TFilteredSnapshotSchema: public ISnapshotSchema {
TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& originalSnapshot, const std::vector<ui32>& columnIds);
TFilteredSnapshotSchema(const ISnapshotSchema::TPtr& originalSnapshot, const std::set<ui32>& columnIds);

virtual const std::set<ui32>& GetColumnIds() const override {
virtual const std::vector<ui32>& GetColumnIds() const override {
return ColumnIds;
}
TColumnSaver GetColumnSaver(const ui32 columnId) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class TSnapshotSchema: public ISnapshotSchema {
public:
TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot);

virtual const std::set<ui32>& GetColumnIds() const override {
return IndexInfo.GetColumnIdsSet();
virtual const std::vector<ui32>& GetColumnIds() const override {
return IndexInfo.GetColumnIds();
}

TColumnSaver GetColumnSaver(const ui32 columnId) const override;
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/conclusion/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class TConclusion {
: Result(result) {
}

template <class TResultArg>
TConclusion(TResultArg& result)
: Result(result) {
}

const TConclusionStatus& GetError() const {
auto result = std::get_if<TConclusionStatus>(&Result);
Y_ABORT_UNLESS(result, "incorrect object for error request");
Expand Down

0 comments on commit 0a1d586

Please sign in to comment.