Skip to content

Commit

Permalink
Merge 755dca1 into 438ee8d
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll authored May 30, 2024
2 parents 438ee8d + 755dca1 commit 6edef2e
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 71 deletions.
2 changes: 1 addition & 1 deletion ydb/library/yql/minikql/aligned_page_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ void TAlignedPagePoolImpl<T>::Free(void* ptr, size_t size) noexcept {
template<typename T>
void TAlignedPagePoolImpl<T>::UpdateMemoryYellowZone() {
if (Limit == 0) return;
if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return;
// if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return;

ui8 usedMemoryPercent = 100 * GetUsed() / Limit;
if (usedMemoryPercent >= EnableMemoryYellowZoneThreshold) {
Expand Down
11 changes: 7 additions & 4 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,9 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
}

bool IsSwitchToSpillingModeCondition() const {
return false;
// return false;
// TODO: YQL-18033
// return !HasMemoryForProcessing();
return !HasMemoryForProcessing();
}

void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
Expand Down Expand Up @@ -863,26 +863,29 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;

if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
std::cerr << std::format("[MISHA][LEFT][{}] Extracting bucket\n", NextBucketToJoin);
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
}

if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
std::cerr << std::format("[MISHA][RIGHT][{}] Extracting bucket\n", NextBucketToJoin);
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
}

if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
std::cerr << std::format("[MISHA][LEFT][{}] Loading bucket\n", NextBucketToJoin);
LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
}

if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
std::cerr << std::format("[MISHA][RIGHT][{}] Loading bucket\n", NextBucketToJoin);
RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
}

if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
if (*PartialJoinCompleted) {
while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) {
while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, NextBucketToJoin + 1)) {
UnpackJoinedData(output);

return EFetchResult::One;
}

Expand Down
138 changes: 76 additions & 62 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,21 @@ void TTable::ResetIterator() {
}

// Checks if there are more tuples and sets bucketId and tupleId to next valid.
inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId ) {
inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId, ui64 bucketLimit ) {

if (bucketId >= tableBucketsStats.size()) return false;
if (bucketId >= bucketLimit) return false;

if ( tupleId >= tableBucketsStats[bucketId].TuplesNum ) {
tupleId = 0;
bucketId ++;

if (bucketId == tableBucketsStats.size()) {
if (bucketId == bucketLimit) {
return false;
}

while( tableBucketsStats[bucketId].TuplesNum == 0 ) {
bucketId ++;
if (bucketId == tableBucketsStats.size()) {
if (bucketId == bucketLimit) {
return false;
}
}
Expand All @@ -181,7 +181,7 @@ inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui

// Returns value of next tuple. Returs true if there are more tuples
bool TTable::NextTuple(TupleData & td){
if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex )) {
if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex, TableBucketsStats.size())) {
GetTupleData(CurrIterBucket, CurrIterIndex, td);
CurrIterIndex++;
return true;
Expand Down Expand Up @@ -767,15 +767,15 @@ inline bool HasRightIdMatch(ui64 currId, ui64 & rightIdIter, const std::vector<u
}


bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
bool TTable::NextJoinedData( TupleData & td1, TupleData & td2, ui64 bucketLimit) {

if (JoinKind == EJoinKind::Cross) {

if (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex))
if (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit))
{
JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td1);

if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex))
if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit))
{
JoinTable2->GetTupleData(JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, td2);
JoinTable2->CurrIterIndex++;
Expand All @@ -786,15 +786,15 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
JoinTable2->CurrIterBucket = 0;
JoinTable2->CurrIterIndex = 0;
JoinTable1->CurrIterIndex++;
return NextJoinedData(td1, td2);
return NextJoinedData(td1, td2, bucketLimit);
}
}
else
return false;
}

if ( JoinKind == EJoinKind::Inner ) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand All @@ -810,7 +810,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}

if ( JoinKind == EJoinKind::Left ) {
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand Down Expand Up @@ -845,7 +845,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}

if ( JoinKind == EJoinKind::Right ) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand Down Expand Up @@ -886,7 +886,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if ( RightTableBatch_ && HasMoreRightTuples_ )
return false;

while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;

bool globalMatchedId = false;
Expand Down Expand Up @@ -917,7 +917,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if (LeftTableBatch_ && HasMoreLeftTuples_ )
return false;

while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;

bool globalMatchedId = false;
Expand Down Expand Up @@ -949,7 +949,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if (RightTableBatch_ && HasMoreRightTuples_ )
return false;

while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;

if ( !RightTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
Expand Down Expand Up @@ -983,7 +983,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if (LeftTableBatch_ && HasMoreLeftTuples_ )
return false;

while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if ( !LeftTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand All @@ -1010,7 +1010,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}

if ( JoinKind == EJoinKind::Full ) {
if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand All @@ -1036,7 +1036,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
Table2Initialized_ = true;
}

while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) {

if (CurrIterBucket != JoinTable2->CurrIterBucket) {
CurrIterBucket = JoinTable2->CurrIterBucket;
Expand All @@ -1060,7 +1060,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}

if ( JoinKind == EJoinKind::Exclusion ) {
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand All @@ -1078,7 +1078,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {

td1.AllNulls = true;

while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) {

if (CurrIterBucket != JoinTable2->CurrIterBucket) {
CurrIterBucket = JoinTable2->CurrIterBucket;
Expand Down Expand Up @@ -1433,71 +1433,85 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
while (NextVectorToProcess != ENextVectorToProcess::None) {
switch (NextVectorToProcess) {
case ENextVectorToProcess::KeyAndVals:
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
if (StateUi64Adapter.IsDataReady()) {
AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
break;
}

if (!StateUi64Adapter.IsDataReady()) {
if (StateUi64Adapter.IsAcceptingDataRequests()) {
StateUi64Adapter.RequestNextVector();
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::DataIntVals;
break;
return;
case ENextVectorToProcess::DataIntVals:
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
if (StateUi64Adapter.IsDataReady()) {
AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::StringsValues;
break;
}

if (!StateUi64Adapter.IsDataReady()) {
if (StateUi64Adapter.IsAcceptingDataRequests()) {
StateUi64Adapter.RequestNextVector();
if (StateUi64Adapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::StringsValues;
break;
return;
case ENextVectorToProcess::StringsValues:
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
if (StateCharAdapter.IsDataReady()) {
AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
break;
}

if (!StateCharAdapter.IsDataReady()) {
if (StateCharAdapter.IsAcceptingDataRequests()) {
StateCharAdapter.RequestNextVector();
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::StringsOffsets;
break;
return;
case ENextVectorToProcess::StringsOffsets:
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
if (StateUi32Adapter.IsDataReady()) {
AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
break;
}

if (!StateUi32Adapter.IsDataReady()) {
if (StateUi32Adapter.IsAcceptingDataRequests()) {
StateUi32Adapter.RequestNextVector();
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::InterfaceValues;
break;
return;
case ENextVectorToProcess::InterfaceValues:
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
if (StateCharAdapter.IsDataReady()) {
AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
break;
}

if (!StateCharAdapter.IsDataReady()) {
if (StateCharAdapter.IsAcceptingDataRequests()) {
StateCharAdapter.RequestNextVector();
if (StateCharAdapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector());
NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets;
break;
return;
case ENextVectorToProcess::InterfaceOffsets:
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
if (StateUi32Adapter.IsDataReady()) {
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());

SpilledBucketsCount--;
if (SpilledBucketsCount == 0) {
NextVectorToProcess = ENextVectorToProcess::None;
State = EState::WaitingForExtraction;
} else {
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
}

break;
}

if (!StateUi32Adapter.IsDataReady()) {
if (StateUi32Adapter.IsAcceptingDataRequests()) {
StateUi32Adapter.RequestNextVector();
if (StateUi32Adapter.HasRunningAsyncIoOperation()) return;
break;
}
AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector());
SpilledBucketsCount--;
if (SpilledBucketsCount == 0) {
NextVectorToProcess = ENextVectorToProcess::None;
State = EState::WaitingForExtraction;
} else {
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
}
break;
return;
default:
return;

Expand Down
6 changes: 5 additions & 1 deletion ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ class TTable {
void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false, ui32 fromBucket = 0, ui32 toBucket = NumberOfBuckets);

// Returns next jointed tuple data. Returs true if there are more tuples
bool NextJoinedData(TupleData& td1, TupleData& td2);
bool NextJoinedData(TupleData& td1, TupleData& td2, ui64 bucketLimit);

bool NextJoinedData(TupleData& td1, TupleData& td2) {
return NextJoinedData(td1, td2, JoinTable1->TableBucketsStats.size());
}

// Creates buckets that support spilling.
void InitializeBucketSpillers(ISpiller::TPtr spiller);
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@ using TBase = TComputationValue<TSpillingSupportState<Sort, HasCount>>;
EOperatingMode GetMode() const { return Mode; }

bool HasMemoryForProcessing() const {
return !TlsAllocState->IsMemoryYellowZoneEnabled();
return true;
// return !TlsAllocState->IsMemoryYellowZoneEnabled();
}

bool IsReadFromChannelFinished() const {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <format>
#include <queue>

#include <ydb/library/yql/minikql/defs.h>
Expand Down Expand Up @@ -115,7 +116,7 @@ class TVectorSpillerAdapter {
///State will be changed to DataREady without any async read operation. ExtractVector is expected
///to be called immediately.
void RequestNextVector() {
MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error");
MKQL_ENSURE(State == EState::AcceptingDataRequests, std::format("[MISHA] Actual {}\n", (int)State));
MKQL_ENSURE(CurrentVector.empty(), "Internal logic error");
MKQL_ENSURE(!StoredChunksElementsCount.empty(), "Internal logic error");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace NYql::NDqs {
NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
THashMap<TString, TString> ClusterNamesMapping;

ui64 MkqlInitialMemoryLimit = 8_GB;
ui64 MkqlInitialMemoryLimit = 50_MB;
ui64 MkqlTotalMemoryLimit = 0;
ui64 MkqlMinAllocSize = 30_MB;
ui64 MkqlProgramHardMemoryLimit = 0;
Expand Down

0 comments on commit 6edef2e

Please sign in to comment.