Skip to content

Commit

Permalink
scanners unification plain/simple for reuse code (ydb-platform#12847)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 2, 2025
1 parent c3f10e1 commit d500b87
Show file tree
Hide file tree
Showing 28 changed files with 1,104 additions and 1,440 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace NKikimr::NOlap::NReader::NCommon {

TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
: CommonContext(commonContext) {
auto readMetadata = CommonContext->GetReadMetadataPtrVerifiedAs<TReadMetadata>();
Y_ABORT_UNLESS(readMetadata->SelectInfo);
ReadMetadata = CommonContext->GetReadMetadataPtrVerifiedAs<TReadMetadata>();
Y_ABORT_UNLESS(ReadMetadata->SelectInfo);

double kffAccessors = 0.01;
double kffFilter = 0.45;
double kffFetching = 0.45;
double kffMerge = 0.10;
TString stagePrefix;
if (readMetadata->GetEarlyFilterColumnIds().size()) {
if (ReadMetadata->GetEarlyFilterColumnIds().size()) {
stagePrefix = "EF";
kffFilter = 0.7;
kffFetching = 0.15;
Expand All @@ -41,15 +41,15 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit)
};
ProcessMemoryGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages);
ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(
CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(ReadMetadata->GetTxId(), stages);
ProcessScopeGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(ReadMetadata->GetTxId(), GetCommonContext()->GetScanId());

auto readSchema = readMetadata->GetResultSchema();
auto readSchema = ReadMetadata->GetResultSchema();
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
IndexChecker = readMetadata->GetProgram().GetIndexChecker();
IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
{
auto predicateColumns = readMetadata->GetPKRangesFilter().GetColumnIds(readMetadata->GetIndexInfo());
auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo());
if (predicateColumns.size()) {
PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, readSchema);
} else {
Expand All @@ -58,26 +58,26 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
}
{
std::set<ui32> columnIds = { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX };
DeletionColumns = std::make_shared<TColumnsSet>(columnIds, readMetadata->GetResultSchema());
DeletionColumns = std::make_shared<TColumnsSet>(columnIds, ReadMetadata->GetResultSchema());
}

if (!!readMetadata->GetRequestShardingInfo()) {
if (!!ReadMetadata->GetRequestShardingInfo()) {
auto shardingColumnIds =
readMetadata->GetIndexInfo().GetColumnIdsVerified(readMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames());
ShardingColumns = std::make_shared<TColumnsSet>(shardingColumnIds, readMetadata->GetResultSchema());
ReadMetadata->GetIndexInfo().GetColumnIdsVerified(ReadMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames());
ShardingColumns = std::make_shared<TColumnsSet>(shardingColumnIds, ReadMetadata->GetResultSchema());
} else {
ShardingColumns = std::make_shared<TColumnsSet>();
}
{
auto efColumns = readMetadata->GetEarlyFilterColumnIds();
auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
if (efColumns.size()) {
EFColumns = std::make_shared<TColumnsSet>(efColumns, readSchema);
} else {
EFColumns = std::make_shared<TColumnsSet>();
}
}
if (readMetadata->HasProcessingColumnIds()) {
FFColumns = std::make_shared<TColumnsSet>(readMetadata->GetProcessingColumnIds(), readSchema);
if (ReadMetadata->HasProcessingColumnIds()) {
FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), readSchema);
if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) {
FFColumns = std::make_shared<TColumnsSet>(*EFColumns + *SpecColumns);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString());
Expand All @@ -95,7 +95,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
}
AllUsageColumns = std::make_shared<TColumnsSet>(*FFColumns + *PredicateColumns);

PKColumns = std::make_shared<TColumnsSet>(readMetadata->GetPKColumnIds(), readSchema);
PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), readSchema);
MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
#include "columns_set.h"

#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>

namespace NKikimr::NOlap::NReader::NCommon {

class TFetchingScript;
class IDataSource;

class TSpecialReadContext {
private:
YDB_READONLY_DEF(std::shared_ptr<TReadContext>, CommonContext);
Expand All @@ -28,13 +32,36 @@ class TSpecialReadContext {
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, FilterStageMemory);
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, FetchingStageMemory);

TReadMetadata::TConstPtr ReadMetadata;
TAtomic AbortFlag = 0;

virtual std::shared_ptr<TFetchingScript> DoGetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) = 0;

protected:
NIndexes::TIndexCheckerContainer IndexChecker;
std::shared_ptr<TColumnsSet> EmptyColumns = std::make_shared<TColumnsSet>();

public:
template <class T>
std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<T>& source) {
return GetColumnsFetchingPlan(std::static_pointer_cast<IDataSource>(source));
}

std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) {
return DoGetColumnsFetchingPlan(source);
}

const TReadMetadata::TConstPtr& GetReadMetadata() const {
return ReadMetadata;
}

template <class T>
std::shared_ptr<T> GetReadMetadataVerifiedAs() const {
auto result = std::dynamic_pointer_cast<T>(ReadMetadata);
AFL_VERIFY(!!result);
return result;
}

ui64 GetProcessMemoryControlId() const {
AFL_VERIFY(ProcessMemoryGuard);
return ProcessMemoryGuard->GetProcessId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include "fetch_steps.h"
#include "source.h"

#include <ydb/core/formats/arrow/common/container.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>

#include <ydb/library/formats/arrow/simple_arrays_cache.h>

namespace NKikimr::NOlap::NReader::NCommon {

TConclusion<bool> TColumnBlobsFetchingStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
return !source->StartFetchingColumns(source, step, Columns);
}

ui64 TColumnBlobsFetchingStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
return source->GetColumnBlobBytes(Columns.GetColumnIds());
}

TConclusion<bool> TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
source->AssembleColumns(Columns);
return true;
}

ui64 TAssemblerStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
return source->GetColumnRawBytes(Columns->GetColumnIds());
}

TConclusion<bool> TOptionalAssemblerStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
source->AssembleColumns(Columns, !source->IsSourceInMemory());
return true;
}

ui64 TOptionalAssemblerStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
return source->GetColumnsVolume(Columns->GetColumnIds(), EMemType::RawSequential);
}

bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*allocation*/) {
auto data = Source.lock();
if (!data || data->GetContext()->IsAborted()) {
guard->Release();
return false;
}
if (StageIndex == EStageFeaturesIndexes::Accessors) {
data->MutableStageData().SetAccessorsGuard(std::move(guard));
} else {
data->RegisterAllocationGuard(std::move(guard));
}
Step.Next();
auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId());
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
return true;
}

TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step, const EStageFeaturesIndexes stageIndex)
: TBase(mem)
, Source(source)
, Step(step)
, TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
, StageIndex(stageIndex) {
}

void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
auto sourcePtr = Source.lock();
if (sourcePtr) {
sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
"cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
}
}

TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
ui64 size = PredefinedSize.value_or(0);
for (auto&& i : Packs) {
ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType());
if (source->GetStageData().GetUseFilter() && i.GetMemType() != EMemType::Blob && source->GetContext()->GetReadMetadata()->HasLimit()) {
const ui32 filtered =
source->GetStageData().GetFilteredCount(source->GetRecordsCount(), source->GetContext()->GetReadMetadata()->GetLimitRobust());
if (filtered < source->GetRecordsCount()) {
sizeLocal = sizeLocal * 1.0 * filtered / source->GetRecordsCount();
}
}
size += sizeLocal;
}

auto allocation = std::make_shared<TFetchingStepAllocation>(source, size, step, StageIndex);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(),
source->GetContext()->GetCommonContext()->GetScanId(), source->GetMemoryGroupId(), { allocation }, (ui32)StageIndex);
return false;
}

ui64 TAllocateMemoryStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& /*source*/) const {
return 0;
}

NKikimr::TConclusion<bool> TBuildStageResultStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
source->BuildStageResult(source);
return true;
}

} // namespace NKikimr::NOlap::NReader::NCommon
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#pragma once
#include "fetching.h"

#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>

namespace NKikimr::NOlap::NReader::NCommon {

class TAllocateMemoryStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
class TColumnsPack {
private:
YDB_READONLY_DEF(TColumnsSetIds, Columns);
YDB_READONLY(EMemType, MemType, EMemType::Blob);

public:
TColumnsPack(const TColumnsSetIds& columns, const EMemType memType)
: Columns(columns)
, MemType(memType) {
}
};
std::vector<TColumnsPack> Packs;
THashMap<ui32, THashSet<EMemType>> Control;
const EStageFeaturesIndexes StageIndex;
const std::optional<ui64> PredefinedSize;

protected:
class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
private:
using TBase = NGroupedMemoryManager::IAllocation;
std::weak_ptr<IDataSource> Source;
TFetchingScriptCursor Step;
NColumnShard::TCounterGuard TasksGuard;
const EStageFeaturesIndexes StageIndex;
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;

public:
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step,
const EStageFeaturesIndexes stageIndex);
};
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
virtual TString DoDebugString() const override {
return TStringBuilder() << "stage=" << StageIndex << ";";
}

public:
void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) {
if (!ids.GetColumnsCount()) {
return;
}
for (auto&& i : ids.GetColumnIds()) {
AFL_VERIFY(Control[i].emplace(memType).second);
}
Packs.emplace_back(ids, memType);
}
EStageFeaturesIndexes GetStage() const {
return StageIndex;
}

TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex)
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
, StageIndex(stageIndex) {
AddAllocation(columns, memType);
}

TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex)
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
, StageIndex(stageIndex)
, PredefinedSize(memSize) {
}
};

class TAssemblerStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
virtual TString DoDebugString() const override {
return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
}

public:
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
TAssemblerStep(const std::shared_ptr<TColumnsSet>& columns, const TString& specName = Default<TString>())
: TBase("ASSEMBLER" + (specName ? "::" + specName : ""))
, Columns(columns) {
AFL_VERIFY(Columns);
AFL_VERIFY(Columns->GetColumnsCount());
}
};

class TBuildStageResultStep: public IFetchingStep {
private:
using TBase = IFetchingStep;

public:
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const override;
TBuildStageResultStep()
: TBase("BUILD_STAGE_RESULT") {
}
};

class TOptionalAssemblerStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
virtual TString DoDebugString() const override {
return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
}

public:
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;

virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
TOptionalAssemblerStep(const std::shared_ptr<TColumnsSet>& columns, const TString& specName = Default<TString>())
: TBase("OPTIONAL_ASSEMBLER" + (specName ? "::" + specName : ""))
, Columns(columns) {
AFL_VERIFY(Columns);
AFL_VERIFY(Columns->GetColumnsCount());
}
};

class TColumnBlobsFetchingStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
TColumnsSetIds Columns;

protected:
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
virtual TString DoDebugString() const override {
return TStringBuilder() << "columns=" << Columns.DebugString() << ";";
}

public:
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
TColumnBlobsFetchingStep(const TColumnsSetIds& columns)
: TBase("FETCHING_COLUMNS")
, Columns(columns) {
AFL_VERIFY(Columns.GetColumnsCount());
}
};

} // namespace NKikimr::NOlap::NReader::NCommon
Loading

0 comments on commit d500b87

Please sign in to comment.