diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 12c1b2080e4b..e68d15f42084 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -868,14 +868,13 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* UnpackJoinedData(output); return EFetchResult::One; - } LeftPacker->TuplesBatchPacked = 0; - LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch + LeftPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket RightPacker->TuplesBatchPacked = 0; - RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch + RightPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket JoinedTablePtr->Clear(); JoinedTablePtr->ResetIterator(); @@ -883,8 +882,8 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* NextBucketToJoin++; } else { - LeftPacker->TablePtr->ExtractBucket(NextBucketToJoin); - RightPacker->TablePtr->ExtractBucket(NextBucketToJoin); + LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin); + RightPacker->TablePtr->PrepareBucket(NextBucketToJoin); *PartialJoinCompleted = true; LeftPacker->StartTime = std::chrono::system_clock::now(); RightPacker->StartTime = std::chrono::system_clock::now(); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index 0f1b2ed44a70..2b71f4c4aa0e 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -296,7 +296,6 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef for (ui64 bucket = fromBucket; bucket < toBucket; bucket++) { - joinResults.clear(); TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket]; TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket]; @@ -1107,25 +1106,28 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } void TTable::Clear() { - for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) { - TTableBucket & tb = TableBuckets[bucket]; - tb.KeyIntVals.clear(); - tb.DataIntVals.clear(); - tb.StringsOffsets.clear(); - tb.StringsValues.clear(); - tb.InterfaceValues.clear(); - tb.InterfaceOffsets.clear(); - tb.JoinIds.clear(); - tb.RightIds.clear(); - - TTableBucketStats & tbs = TableBucketsStats[bucket]; - tbs.TuplesNum = 0; - tbs.KeyIntValsTotalSize = 0; - tbs.StringValuesTotalSize = 0; + ClearBucket(bucket); } } +void TTable::ClearBucket(ui64 bucket) { + TTableBucket & tb = TableBuckets[bucket]; + tb.KeyIntVals.clear(); + tb.DataIntVals.clear(); + tb.StringsOffsets.clear(); + tb.StringsValues.clear(); + tb.InterfaceValues.clear(); + tb.InterfaceOffsets.clear(); + tb.JoinIds.clear(); + tb.RightIds.clear(); + + TTableBucketStats & tbs = TableBucketsStats[bucket]; + tbs.TuplesNum = 0; + tbs.KeyIntValsTotalSize = 0; + tbs.StringValuesTotalSize = 0; +} + void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) { for (size_t i = 0; i < NumberOfBuckets; ++i) { TableBucketsSpillers.emplace_back(spiller, 5_MB); @@ -1154,8 +1156,7 @@ bool TTable::TryToReduceMemoryAndWait() { } } - if (largestBucketSize) return false; - + if (!largestBucketSize) return false; TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex])); TableBuckets[largestBucketIndex] = TTableBucket{}; @@ -1175,8 +1176,8 @@ void TTable::FinalizeSpilling() { if (!TableBucketsSpillers[bucket].IsInMemory()) { TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket])); TableBuckets[bucket] = TTableBucket{}; + TableBucketsSpillers[bucket].Finalize(); } - TableBucketsSpillers[bucket].Finalize(); } } @@ -1197,7 +1198,8 @@ void TTable::StartLoadingBucket(ui32 bucket) { TableBucketsSpillers[bucket].StartBucketRestoration(); } -void TTable::ExtractBucket(ui64 bucket) { +void TTable::PrepareBucket(ui64 bucket) { + if (!TableBucketsSpillers[bucket].IsExtractionRequired()) return; TableBuckets[bucket] = std::move(TableBucketsSpillers[bucket].ExtractBucket()); } @@ -1282,6 +1284,7 @@ void TTableBucketSpiller::Finalize() { void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) { MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error"); State = EState::Spilling; + IsBucketOwnedBySpiller = true; CurrentBucket = std::move(bucket); NextVectorToProcess = ENextVectorToProcess::KeyAndVals; @@ -1292,6 +1295,7 @@ void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) { TTableBucket&& TTableBucketSpiller::ExtractBucket() { MKQL_ENSURE(State == EState::InMemory, "Internal logic error"); MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error"); + IsBucketOwnedBySpiller = false; return std::move(CurrentBucket); } @@ -1305,6 +1309,10 @@ bool TTableBucketSpiller::IsInMemory() const { return State == EState::InMemory; } +bool TTableBucketSpiller::IsExtractionRequired() const { + return IsBucketOwnedBySpiller; +} + void TTableBucketSpiller::StartBucketRestoration() { MKQL_ENSURE(State == EState::Restoring, "Internal logic error"); MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error"); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 23d0d616a621..1f5832518107 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -117,6 +117,7 @@ class TTableBucketSpiller { bool HasRunningAsyncIoOperation() const; bool IsInMemory() const; + bool IsExtractionRequired() const; private: void ProcessBucketSpilling(); @@ -154,6 +155,8 @@ class TTableBucketSpiller { bool IsFinalizing = false; TTableBucket CurrentBucket; + + bool IsBucketOwnedBySpiller = false; }; @@ -288,8 +291,11 @@ class TTable { // Starts loading spilled bucket to memory. void StartLoadingBucket(ui32 bucket); - // Extracts loaded bucket from spilling. - void ExtractBucket(ui64 bucket); + // Prepares bucket for joining after spilling and restoring back. + void PrepareBucket(ui64 bucket); + + // Clears all the data related to a single bucket + void ClearBucket(ui64 bucket); // Clears table content void Clear();