Skip to content

Commit

Permalink
Correct get schema validations (#11686)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 18, 2024
1 parent e1522ed commit 9aed6fe
Show file tree
Hide file tree
Showing 12 changed files with 17 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchema(operation.GetTableId().GetSchemaVersion());
auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(operation.GetTableId().GetSchemaVersion());
if (!schema) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ std::shared_ptr<TPortionInfo> TPortionInfoConstructor::Build() {

ISnapshotSchema::TPtr TPortionInfoConstructor::GetSchema(const TVersionedIndex& index) const {
if (SchemaVersion) {
auto schema = index.GetSchema(SchemaVersion.value());
auto schema = index.GetSchemaVerified(SchemaVersion.value());
AFL_VERIFY(!!schema)("details", TStringBuilder() << "cannot find schema for version " << SchemaVersion.value());
return schema;
} else {
AFL_VERIFY(MinSnapshotDeprecated);
return index.GetSchemaVerified(*MinSnapshotDeprecated);
}
AFL_VERIFY(MinSnapshotDeprecated);
return index.GetSchema(*MinSnapshotDeprecated);
}

void TPortionInfoConstructor::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ const TString& TPortionInfo::GetIndexStorageId(const ui32 indexId, const TIndexI
ISnapshotSchema::TPtr TPortionInfo::GetSchema(const TVersionedIndex& index) const {
AFL_VERIFY(SchemaVersion);
if (SchemaVersion) {
auto schema = index.GetSchema(SchemaVersion.value());
auto schema = index.GetSchemaVerified(SchemaVersion.value());
AFL_VERIFY(!!schema)("details", TStringBuilder() << "cannot find schema for version " << SchemaVersion.value());
return schema;
}
return index.GetSchema(MinSnapshotDeprecated);
return index.GetSchemaVerified(MinSnapshotDeprecated);
}

ISnapshotSchema::TPtr TPortionInfo::TSchemaCursor::GetSchema(const TPortionInfoConstructor& portion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedInd
}
//its possible dont use columns from filter where pk field compare with null and remove from PKFilter and program, but stay in kqp columns request
if (vIndex) {
for (auto&& i : vIndex->GetSchema(read.GetSnapshot())->GetIndexInfo().GetReplaceKey()->field_names()) {
for (auto&& i : vIndex->GetSchemaVerified(read.GetSnapshot())->GetIndexInfo().GetReplaceKey()->field_names()) {
const TString cId(i.data(), i.size());
namesChecker.erase(cId);
programColumns.erase(cId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ struct TReadMetadataBase {
ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const;

const std::shared_ptr<NArrow::TSchemaLite>& GetBlobSchema(const ui64 version) const {
return GetIndexVersions().GetSchema(version)->GetIndexInfo().ArrowSchema();
return GetIndexVersions().GetSchemaVerified(version)->GetIndexInfo().ArrowSchema();
}

const TIndexInfo& GetIndexInfo(const std::optional<TSnapshot>& version = {}) const {
AFL_VERIFY(ResultIndexSchema);
if (version && version < RequestSnapshot) {
return GetIndexVersions().GetSchema(*version)->GetIndexInfo();
return GetIndexVersions().GetSchemaVerified(*version)->GetIndexInfo();
}
return ResultIndexSchema->GetIndexInfo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NKikimr::NOlap::NReader::NPlain {
NKikimr::TConclusionStatus TIndexScannerConstructor::ParseProgram(
const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const {
AFL_VERIFY(vIndex);
auto& indexInfo = vIndex->GetSchema(Snapshot)->GetIndexInfo();
auto& indexInfo = vIndex->GetSchemaVerified(Snapshot)->GetIndexInfo();
TIndexColumnResolver columnResolver(indexInfo);
return TBase::ParseProgram(vIndex, proto.GetOlapProgramType(), proto.GetOlapProgram(), read, columnResolver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ struct TReadMetadata : public TReadMetadataBase {
std::shared_ptr<TReadStats> ReadStats;

TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram)
: TBase(info, sorting, ssaProgram, info->GetSchema(snapshot), snapshot)
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot)
, PathId(pathId)
, ReadStats(std::make_shared<TReadStats>())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ class IDataSource {
return DoStartFetchingAccessor(sourcePtr, step);
}

virtual ui64 PredictAccessorMemoryBytes() const = 0;

bool AddTxConflict() {
if (!Context->GetCommonContext()->HasLock()) {
return false;
Expand Down Expand Up @@ -317,9 +315,6 @@ class TPortionDataSource: public IDataSource {
}

virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override;
virtual ui64 PredictAccessorMemoryBytes() const override {
return Portion->PredictMetadataMemorySize(Schema->GetColumnsCount());
}

public:
virtual bool NeedAccessorsFetching() const override {
Expand Down Expand Up @@ -459,9 +454,6 @@ class TCommittedDataSource: public IDataSource {
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& /*sourcePtr*/, const TFetchingScriptCursor& /*step*/) override {
return false;
}
virtual ui64 PredictAccessorMemoryBytes() const override {
return 0;
}

virtual ui64 GetColumnsVolume(const std::set<ui32>& columnIds, const EMemType type) const override {
AFL_VERIFY(columnIds.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TVersionedIndex {
return sb;
}

ISnapshotSchema::TPtr GetSchema(const ui64 version) const {
ISnapshotSchema::TPtr GetSchemaOptional(const ui64 version) const {
auto it = SnapshotByVersion.find(version);
return it == SnapshotByVersion.end() ? nullptr : it->second;
}
Expand All @@ -84,7 +84,7 @@ class TVersionedIndex {
return it->second;
}

ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const {
ISnapshotSchema::TPtr GetSchemaVerified(const TSnapshot& version) const {
for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) {
if (it->first <= version) {
return it->second;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class TRowsAndBytesChangesTask: public NConveyor::ITask {
};

void TChunksNormalizer::TChunkInfo::InitSchema(const NColumnShard::TTablesManager& tm) {
Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId()));
Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId()));
}

TConclusion<std::vector<INormalizerTask::TPtr>> TChunksNormalizer::DoInit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(

for (auto&& [_, chunkWithPortionData] : portionsToWrite) {
package.emplace_back(
tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(chunkWithPortionData.GetMinSnapshotDeprecated())->GetVersion(),
tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(chunkWithPortionData.GetMinSnapshotDeprecated())->GetVersion(),
std::move(chunkWithPortionData));
if (package.size() == 100) {
std::vector<TPatchItem> local;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class TTablesManager {

const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version) const {
Y_ABORT_UNLESS(!!PrimaryIndex);
return PrimaryIndex->GetVersionedIndex().GetSchema(version)->GetIndexInfo();
return PrimaryIndex->GetVersionedIndex().GetSchemaVerified(version)->GetIndexInfo();
}

const std::unique_ptr<NOlap::IColumnEngine>& GetPrimaryIndex() const {
Expand Down

0 comments on commit 9aed6fe

Please sign in to comment.