Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct get schema validations #11686

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchema(operation.GetTableId().GetSchemaVersion());
auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(operation.GetTableId().GetSchemaVersion());
if (!schema) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ std::shared_ptr<TPortionInfo> TPortionInfoConstructor::Build() {

ISnapshotSchema::TPtr TPortionInfoConstructor::GetSchema(const TVersionedIndex& index) const {
if (SchemaVersion) {
auto schema = index.GetSchema(SchemaVersion.value());
auto schema = index.GetSchemaVerified(SchemaVersion.value());
AFL_VERIFY(!!schema)("details", TStringBuilder() << "cannot find schema for version " << SchemaVersion.value());
return schema;
} else {
AFL_VERIFY(MinSnapshotDeprecated);
return index.GetSchemaVerified(*MinSnapshotDeprecated);
}
AFL_VERIFY(MinSnapshotDeprecated);
return index.GetSchema(*MinSnapshotDeprecated);
}

void TPortionInfoConstructor::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ const TString& TPortionInfo::GetIndexStorageId(const ui32 indexId, const TIndexI
ISnapshotSchema::TPtr TPortionInfo::GetSchema(const TVersionedIndex& index) const {
AFL_VERIFY(SchemaVersion);
if (SchemaVersion) {
auto schema = index.GetSchema(SchemaVersion.value());
auto schema = index.GetSchemaVerified(SchemaVersion.value());
AFL_VERIFY(!!schema)("details", TStringBuilder() << "cannot find schema for version " << SchemaVersion.value());
return schema;
}
return index.GetSchema(MinSnapshotDeprecated);
return index.GetSchemaVerified(MinSnapshotDeprecated);
}

ISnapshotSchema::TPtr TPortionInfo::TSchemaCursor::GetSchema(const TPortionInfoConstructor& portion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedInd
}
//its possible dont use columns from filter where pk field compare with null and remove from PKFilter and program, but stay in kqp columns request
if (vIndex) {
for (auto&& i : vIndex->GetSchema(read.GetSnapshot())->GetIndexInfo().GetReplaceKey()->field_names()) {
for (auto&& i : vIndex->GetSchemaVerified(read.GetSnapshot())->GetIndexInfo().GetReplaceKey()->field_names()) {
const TString cId(i.data(), i.size());
namesChecker.erase(cId);
programColumns.erase(cId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ struct TReadMetadataBase {
ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const;

const std::shared_ptr<NArrow::TSchemaLite>& GetBlobSchema(const ui64 version) const {
return GetIndexVersions().GetSchema(version)->GetIndexInfo().ArrowSchema();
return GetIndexVersions().GetSchemaVerified(version)->GetIndexInfo().ArrowSchema();
}

const TIndexInfo& GetIndexInfo(const std::optional<TSnapshot>& version = {}) const {
AFL_VERIFY(ResultIndexSchema);
if (version && version < RequestSnapshot) {
return GetIndexVersions().GetSchema(*version)->GetIndexInfo();
return GetIndexVersions().GetSchemaVerified(*version)->GetIndexInfo();
}
return ResultIndexSchema->GetIndexInfo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NKikimr::NOlap::NReader::NPlain {
NKikimr::TConclusionStatus TIndexScannerConstructor::ParseProgram(
const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const {
AFL_VERIFY(vIndex);
auto& indexInfo = vIndex->GetSchema(Snapshot)->GetIndexInfo();
auto& indexInfo = vIndex->GetSchemaVerified(Snapshot)->GetIndexInfo();
TIndexColumnResolver columnResolver(indexInfo);
return TBase::ParseProgram(vIndex, proto.GetOlapProgramType(), proto.GetOlapProgram(), read, columnResolver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ struct TReadMetadata : public TReadMetadataBase {
std::shared_ptr<TReadStats> ReadStats;

TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram)
: TBase(info, sorting, ssaProgram, info->GetSchema(snapshot), snapshot)
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot)
, PathId(pathId)
, ReadStats(std::make_shared<TReadStats>())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ class IDataSource {
return DoStartFetchingAccessor(sourcePtr, step);
}

virtual ui64 PredictAccessorMemoryBytes() const = 0;

bool AddTxConflict() {
if (!Context->GetCommonContext()->HasLock()) {
return false;
Expand Down Expand Up @@ -317,9 +315,6 @@ class TPortionDataSource: public IDataSource {
}

virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override;
virtual ui64 PredictAccessorMemoryBytes() const override {
return Portion->PredictMetadataMemorySize(Schema->GetColumnsCount());
}

public:
virtual bool NeedAccessorsFetching() const override {
Expand Down Expand Up @@ -459,9 +454,6 @@ class TCommittedDataSource: public IDataSource {
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& /*sourcePtr*/, const TFetchingScriptCursor& /*step*/) override {
return false;
}
virtual ui64 PredictAccessorMemoryBytes() const override {
return 0;
}

virtual ui64 GetColumnsVolume(const std::set<ui32>& columnIds, const EMemType type) const override {
AFL_VERIFY(columnIds.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TVersionedIndex {
return sb;
}

ISnapshotSchema::TPtr GetSchema(const ui64 version) const {
ISnapshotSchema::TPtr GetSchemaOptional(const ui64 version) const {
auto it = SnapshotByVersion.find(version);
return it == SnapshotByVersion.end() ? nullptr : it->second;
}
Expand All @@ -84,7 +84,7 @@ class TVersionedIndex {
return it->second;
}

ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const {
ISnapshotSchema::TPtr GetSchemaVerified(const TSnapshot& version) const {
for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) {
if (it->first <= version) {
return it->second;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class TRowsAndBytesChangesTask: public NConveyor::ITask {
};

void TChunksNormalizer::TChunkInfo::InitSchema(const NColumnShard::TTablesManager& tm) {
Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId()));
Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId()));
}

TConclusion<std::vector<INormalizerTask::TPtr>> TChunksNormalizer::DoInit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(

for (auto&& [_, chunkWithPortionData] : portionsToWrite) {
package.emplace_back(
tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(chunkWithPortionData.GetMinSnapshotDeprecated())->GetVersion(),
tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(chunkWithPortionData.GetMinSnapshotDeprecated())->GetVersion(),
std::move(chunkWithPortionData));
if (package.size() == 100) {
std::vector<TPatchItem> local;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class TTablesManager {

const NOlap::TIndexInfo& GetIndexInfo(const NOlap::TSnapshot& version) const {
Y_ABORT_UNLESS(!!PrimaryIndex);
return PrimaryIndex->GetVersionedIndex().GetSchema(version)->GetIndexInfo();
return PrimaryIndex->GetVersionedIndex().GetSchemaVerified(version)->GetIndexInfo();
}

const std::unique_ptr<NOlap::IColumnEngine>& GetPrimaryIndex() const {
Expand Down
Loading